diff --git a/src/main/java/org/bdware/server/action/MasterWSAction.java b/src/main/java/org/bdware/server/action/MasterWSAction.java index 59e79c7..60945c9 100644 --- a/src/main/java/org/bdware/server/action/MasterWSAction.java +++ b/src/main/java/org/bdware/server/action/MasterWSAction.java @@ -140,27 +140,28 @@ public class MasterWSAction { } */ List nodeNames; // nodes' peerID - if (contract.getType().equals(ContractExecType.SelfAdaptiveSharding)) { - if (ContractManager.instance.nodeCenterConn.listNodes().length < 3) { - rc.onResult( - "{\"status\":\"Error\",\"result\":\"No enough nodes!\"," - + "\"action\":\"onStartTrustfulContract\"}"); - return; - } - int unitSize = args.get("selectUnitNum").getAsInt(); - nodeNames = - Arrays.asList(ContractManager.instance.nodeCenterConn.getClusterByKey( - contractID, - Math.max(unitSize, 3))); - } else {// all nodes' peerID in the unit - String[] nodeNamesStr = - args.get("peersID").getAsString().split(","); - // record information of these nodes - nodeNames = - Arrays.stream(nodeNamesStr) - .filter(x -> null != x && !x.isEmpty()) - .collect(Collectors.toList()); - } +// if (contract.getType().equals(ContractExecType.SelfAdaptiveSharding)) { +// if (ContractManager.instance.nodeCenterConn.listNodes().length < 3) { +// rc.onResult( +// "{\"status\":\"Error\",\"result\":\"No enough nodes!\"," +// + "\"action\":\"onStartTrustfulContract\"}"); +// return; +// } +// int unitSize = args.get("selectUnitNum").getAsInt(); +// nodeNames = +// Arrays.asList(ContractManager.instance.nodeCenterConn.getClusterByKey( +// contractID, +// Math.max(unitSize, 3))); +// } else { + // all nodes' peerID in the unit + String[] nodeNamesStr = + args.get("peersID").getAsString().split(","); + // record information of these nodes + nodeNames = + Arrays.stream(nodeNamesStr) + .filter(x -> null != x && !x.isEmpty()) + .collect(Collectors.toList()); +// } int nodeSize = nodeNames.size(); // 方式一向NodeCenter发,要求Slave节点主动连接到Master节点. diff --git a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java index 19b69aa..f1b6a87 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java @@ -78,6 +78,11 @@ public class MasterClientTCPAction { } finally { CMActions.manager.stopContractWithOwner( request.get("verifiedPubKey").getAsString(), client.getContractID()); + MultiContractMeta cei = + CMActions.manager.multiContractRecorder.getMultiContractMeta(client.getContractID()); + if (null != cei && cei.contractExecutor instanceof SelfAdaptiveShardingExecutor) { + ((SelfAdaptiveShardingExecutor) cei.contractExecutor).close(); + } } } diff --git a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java index d5a03bb..9ac7b9c 100644 --- a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java +++ b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java @@ -1,9 +1,11 @@ package org.bdware.server.trustedmodel; import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bdware.sc.ContractManager; +import org.bdware.sc.ContractResult; import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.units.MultiContractMeta; @@ -46,6 +48,8 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { TimeUnit.SECONDS); ContractManager.threadPool.submit(() -> { while (running) { + LOGGER.debug("latest block=" + this.b.prevHash + ", to be executed size=" + toExecuted.size()); + LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs)); while (!toExecuted.isEmpty()) { String key = this.b.prevHash; Block block = toExecuted.get(key); @@ -76,10 +80,17 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { @Override public void execute(String requestID, ResultCallback rc, ContractRequest req) { if (executedTxs.containsKey(requestID)) { + rc.onResult(JsonUtil.toJson(new ContractResult( + ContractResult.Status.Error, + new JsonPrimitive("this request has been packed!")))); return; } + LOGGER.debug("receive contract request " + requestID); executedTxs.put(requestID, false); reqQueue.add(req); + rc.onResult(JsonUtil.toJson(new ContractResult( + ContractResult.Status.Executing, + new JsonPrimitive("this request is adding into blocks")))); if (reqQueue.size() >= SUBMIT_LIMIT) { submitBlock(); } @@ -105,14 +116,17 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { } private synchronized void executeBlock(Block block) { + LOGGER.debug("start"); for (ContractRequest request : block.requests) { - if (executedTxs.containsKey(request.getContractID()) && executedTxs.get(request.getContractID())) { + if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) { + LOGGER.debug("find request " + request.getRequestID() + " has been executed!"); return; } } for (ContractRequest request : block.requests) { - ContractManager.instance.executeLocally(request, null); - executedTxs.put(request.getContractID(), true); + String ret = ContractManager.instance.executeLocally(request, null); + LOGGER.debug("result of request " + request.getRequestID() + ": " + ret); + executedTxs.put(request.getRequestID(), true); } this.b = new Block(block.hash); executedBlocks.add(block.hash); @@ -122,6 +136,7 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { ContractManager.threadPool.execute(() -> { Block block = fillBlock(); if (null != block) { + LOGGER.debug(JsonUtil.toPrettyJson(block)); String[] nodes = this.meta.getMembers(); JsonObject req = new JsonObject(); req.addProperty("action", "deliverBlock");