fix: fix bugs in SelfAdaptiveSharding contracts

fix bugs in SelfAdaptiveShardingExecutor to support contracts of this type; add return value in SelfAdaptiveShardingExecutor.execute; remove DHT-style selecting of nodes to deploy SelfAdaptiveSharding contracts on; stop executor when killing contracts
This commit is contained in:
Frank.R.Wu 2021-11-17 17:40:06 +08:00
parent 5c19c1570d
commit 4423af8d1c
3 changed files with 45 additions and 24 deletions

View File

@ -140,27 +140,28 @@ public class MasterWSAction {
}
*/
List<String> nodeNames; // nodes' peerID
if (contract.getType().equals(ContractExecType.SelfAdaptiveSharding)) {
if (ContractManager.instance.nodeCenterConn.listNodes().length < 3) {
rc.onResult(
"{\"status\":\"Error\",\"result\":\"No enough nodes!\","
+ "\"action\":\"onStartTrustfulContract\"}");
return;
}
int unitSize = args.get("selectUnitNum").getAsInt();
nodeNames =
Arrays.asList(ContractManager.instance.nodeCenterConn.getClusterByKey(
contractID,
Math.max(unitSize, 3)));
} else {// all nodes' peerID in the unit
String[] nodeNamesStr =
args.get("peersID").getAsString().split(",");
// record information of these nodes
nodeNames =
Arrays.stream(nodeNamesStr)
.filter(x -> null != x && !x.isEmpty())
.collect(Collectors.toList());
}
// if (contract.getType().equals(ContractExecType.SelfAdaptiveSharding)) {
// if (ContractManager.instance.nodeCenterConn.listNodes().length < 3) {
// rc.onResult(
// "{\"status\":\"Error\",\"result\":\"No enough nodes!\","
// + "\"action\":\"onStartTrustfulContract\"}");
// return;
// }
// int unitSize = args.get("selectUnitNum").getAsInt();
// nodeNames =
// Arrays.asList(ContractManager.instance.nodeCenterConn.getClusterByKey(
// contractID,
// Math.max(unitSize, 3)));
// } else {
// all nodes' peerID in the unit
String[] nodeNamesStr =
args.get("peersID").getAsString().split(",");
// record information of these nodes
nodeNames =
Arrays.stream(nodeNamesStr)
.filter(x -> null != x && !x.isEmpty())
.collect(Collectors.toList());
// }
int nodeSize = nodeNames.size();
// 方式一向NodeCenter发要求Slave节点主动连接到Master节点.

View File

@ -78,6 +78,11 @@ public class MasterClientTCPAction {
} finally {
CMActions.manager.stopContractWithOwner(
request.get("verifiedPubKey").getAsString(), client.getContractID());
MultiContractMeta cei =
CMActions.manager.multiContractRecorder.getMultiContractMeta(client.getContractID());
if (null != cei && cei.contractExecutor instanceof SelfAdaptiveShardingExecutor) {
((SelfAdaptiveShardingExecutor) cei.contractExecutor).close();
}
}
}

View File

@ -1,9 +1,11 @@
package org.bdware.server.trustedmodel;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.ContractManager;
import org.bdware.sc.ContractResult;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.units.MultiContractMeta;
@ -46,6 +48,8 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
TimeUnit.SECONDS);
ContractManager.threadPool.submit(() -> {
while (running) {
LOGGER.debug("latest block=" + this.b.prevHash + ", to be executed size=" + toExecuted.size());
LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs));
while (!toExecuted.isEmpty()) {
String key = this.b.prevHash;
Block block = toExecuted.get(key);
@ -76,10 +80,17 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
@Override
public void execute(String requestID, ResultCallback rc, ContractRequest req) {
if (executedTxs.containsKey(requestID)) {
rc.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Error,
new JsonPrimitive("this request has been packed!"))));
return;
}
LOGGER.debug("receive contract request " + requestID);
executedTxs.put(requestID, false);
reqQueue.add(req);
rc.onResult(JsonUtil.toJson(new ContractResult(
ContractResult.Status.Executing,
new JsonPrimitive("this request is adding into blocks"))));
if (reqQueue.size() >= SUBMIT_LIMIT) {
submitBlock();
}
@ -105,14 +116,17 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
}
private synchronized void executeBlock(Block block) {
LOGGER.debug("start");
for (ContractRequest request : block.requests) {
if (executedTxs.containsKey(request.getContractID()) && executedTxs.get(request.getContractID())) {
if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) {
LOGGER.debug("find request " + request.getRequestID() + " has been executed!");
return;
}
}
for (ContractRequest request : block.requests) {
ContractManager.instance.executeLocally(request, null);
executedTxs.put(request.getContractID(), true);
String ret = ContractManager.instance.executeLocally(request, null);
LOGGER.debug("result of request " + request.getRequestID() + ": " + ret);
executedTxs.put(request.getRequestID(), true);
}
this.b = new Block(block.hash);
executedBlocks.add(block.hash);
@ -122,6 +136,7 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor {
ContractManager.threadPool.execute(() -> {
Block block = fillBlock();
if (null != block) {
LOGGER.debug(JsonUtil.toPrettyJson(block));
String[] nodes = this.meta.getMembers();
JsonObject req = new JsonObject();
req.addProperty("action", "deliverBlock");