feat: sharding executor exec locally
This commit is contained in:
@@ -8,6 +8,7 @@ public abstract class AbstractContextContractExecutor implements ContractExecuto
|
||||
static protected IGlobalConf globalConf = ConsistencyPluginManager.getContext().getGlobalConf();
|
||||
static protected ICMActions cmActions = ConsistencyPluginManager.getContext().getCMActions();
|
||||
static protected INetworkManager networkManager = ConsistencyPluginManager.getContext().getNetworkManager();
|
||||
static protected IMasterClientTCPAction masterClientTCPAction = ConsistencyPluginManager.getContext().getMasterClientTCPAction();
|
||||
static protected IMasterServerTCPAction masterServerTCPAction = ConsistencyPluginManager.getContext().getMasterServerTCPAction();
|
||||
static protected IMasterServerRecoverMechAction masterServerRecoverMechAction = ConsistencyPluginManager.getContext().getMasterServerRecoverMechAction();
|
||||
}
|
||||
|
||||
@@ -65,13 +65,13 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
|
||||
count); // 把count改成了1,设置成获得1个响应就行
|
||||
}
|
||||
|
||||
public void sendRequest(String id, ContractRequest req, String[] nodes) {
|
||||
Map<String, Object> reqStr = new HashMap<>();
|
||||
reqStr.put("uniReqID", id);
|
||||
reqStr.put("data", req);
|
||||
public void sendRequest(String id, ContractRequest req, String[] nodes, ResultCallback rc) {
|
||||
req.needSeq = false;
|
||||
reqStr.put("action", "executeContractLocally");
|
||||
String sendStr = JsonUtil.toJson(reqStr);
|
||||
JsonObject jo = new JsonObject();
|
||||
jo.addProperty("uniReqID", id);
|
||||
jo.add("data", JsonUtil.parseObject(req));
|
||||
jo.addProperty("action", "executeContractLocally");
|
||||
String sendStr = jo.toString();
|
||||
// master负责缓存请求
|
||||
if (!masterServerTCPAction.getReqCache().containsKey(contractID)) {
|
||||
masterServerTCPAction.getReqCache().put(contractID, new RequestCache());
|
||||
@@ -82,13 +82,17 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
|
||||
LOGGER.info("node size = " + nodes.length);
|
||||
LOGGER.debug("nodes:" + JsonUtil.toJson(nodes));
|
||||
for (String node : nodes) {
|
||||
LOGGER.info(
|
||||
"[sendRequests] get cmNode "
|
||||
+ node.substring(0, 5)
|
||||
+ " not null "
|
||||
+ "RequestAllExecutor 发送请求给 "
|
||||
+ node.substring(0, 5));
|
||||
networkManager.sendToAgent(node, sendStr);
|
||||
if (node.equals(globalConf.getNodeID())) {
|
||||
masterClientTCPAction.asyncExecuteContractLocally(jo, rc);
|
||||
} else {
|
||||
LOGGER.info(
|
||||
"[sendRequests] get cmNode "
|
||||
+ node.substring(0, 5)
|
||||
+ " not null "
|
||||
+ "RequestAllExecutor 发送请求给 "
|
||||
+ node.substring(0, 5));
|
||||
networkManager.sendToAgent(node, sendStr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -218,7 +222,7 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
|
||||
createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器
|
||||
masterServerTCPAction.getSync().sleep(id, collector);
|
||||
LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq);
|
||||
sendRequest(id, req, nodes); // 发送请求
|
||||
sendRequest(id, req, nodes, collector); // 发送请求
|
||||
} else {
|
||||
LOGGER.info("invalidNodeNumOnResult");
|
||||
request_index.getAndDecrement();
|
||||
|
||||
Reference in New Issue
Block a user