From 9fb5cd6eb42fb69dc7d9d3e1d90632a5199130e8 Mon Sep 17 00:00:00 2001 From: "Frank.R.Wu" Date: Fri, 19 Nov 2021 14:51:25 +0800 Subject: [PATCH] feat: update SelfAdaptiveShardingExecutor update SelfAdaptiveShardingExecutor.submitBlock to avoid creating too many threads; add logs --- .../org/bdware/server/action/CMActions.java | 79 ++++++++++--------- .../bdware/server/action/MasterWSAction.java | 2 +- .../action/p2p/MasterClientTCPAction.java | 10 +-- .../SelfAdaptiveShardingExecutor.java | 55 +++++++------ 4 files changed, 80 insertions(+), 66 deletions(-) diff --git a/src/main/java/org/bdware/server/action/CMActions.java b/src/main/java/org/bdware/server/action/CMActions.java index c48a455..81d973d 100644 --- a/src/main/java/org/bdware/server/action/CMActions.java +++ b/src/main/java/org/bdware/server/action/CMActions.java @@ -120,15 +120,17 @@ public class CMActions implements OnHashCallback { final JsonObject args, final ResultCallback resultCallback, final OnHashCallback hashcb) { - final ContractRequest c = new ContractRequest(); - if (!args.has("contractName") && !args.has("contractID") && !args.has("contractDOI")) { + final ContractRequest cReq = new ContractRequest(); + if (!args.has("contractName") && + !args.has("contractID") && + !args.has("contractDOI")) { resultCallback.onResult(MISSING_ARGUMENT); return; } if (args.has("contractDOI") && !args.has("contractID")) { LOGGER.debug("contractDOI: " + args.get("contractDOI").getAsString()); try { - c.setContractDOI(args.get("contractDOI").getAsString()); + cReq.setContractDOI(args.get("contractDOI").getAsString()); } catch (Exception e) { e.printStackTrace(); resultCallback.onResult(INVALID_DOI); @@ -136,86 +138,91 @@ public class CMActions implements OnHashCallback { } } else { if (args.has("contractName")) { - c.setContractID(args.get("contractName").getAsString()); + cReq.setContractID(args.get("contractName").getAsString()); } if (args.has("contractID")) { - c.setContractID(args.get("contractID").getAsString()); + cReq.setContractID(args.get("contractID").getAsString()); } } - if (args.has("isDebug")) c.setFromDebug(args.get("isDebug").getAsBoolean()); + if (args.has("isDebug")) { + cReq.setFromDebug(args.get("isDebug").getAsBoolean()); + } if (args.has("withDynamicAnalysis")) - c.withDynamicAnalysis = args.get("withDynamicAnalysis").getAsBoolean(); + cReq.withDynamicAnalysis = args.get("withDynamicAnalysis").getAsBoolean(); if (args.has("withEvaluatesAnalysis")) - c.withEvaluatesAnalysis = args.get("withEvaluatesAnalysis").getAsBoolean(); + cReq.withEvaluatesAnalysis = args.get("withEvaluatesAnalysis").getAsBoolean(); if (!args.has("arg")) { resultCallback.onResult(MISSING_ARGUMENT); return; } if (args.has("operation")) { - c.setAction(args.get("operation").getAsString()); - c.setArg(args.get("arg").getAsString()); + cReq.setAction(args.get("operation").getAsString()); + cReq.setArg(args.get("arg").getAsString()); } else { JsonObject jo = JsonParser.parseString(args.get("arg").getAsString()).getAsJsonObject(); if (!jo.has("action") || !jo.has("arg")) { resultCallback.onResult(MISSING_ARGUMENT); return; } - c.setAction(jo.get("action").getAsString()); - c.setArg(jo.get("arg").getAsString()); - if (c.withEvaluatesAnalysis) { - c.setValue(jo.get("hasValue").getAsLong()); + cReq.setAction(jo.get("action").getAsString()); + cReq.setArg(jo.get("arg").getAsString()); + if (cReq.withEvaluatesAnalysis) { + cReq.setValue(jo.get("hasValue").getAsLong()); } } - if (args.has("gasLimit")) c.setGasLimit(args.get("gasLimit").getAsLong()); - + if (args.has("gasLimit")) { + cReq.setGasLimit(args.get("gasLimit").getAsLong()); + } if (args.has("requester")) { - c.setPublicKey(args.get("requester").getAsString()); + cReq.setPublicKey(args.get("requester").getAsString()); byte[] sign = ByteUtils.fromHexString(args.get("signature").getAsString()); - c.setSignature(ByteUtils.toHexString(sign)); + cReq.setSignature(ByteUtils.toHexString(sign)); } if (args.has("requesterDOI")) { - c.setRequesterDOI(args.get("requesterDOI").getAsString()); + cReq.setRequesterDOI(args.get("requesterDOI").getAsString()); } else { - c.setRequesterDOI("empty"); + cReq.setRequesterDOI("empty"); } if (args.has("pubkey")) { - c.setPublicKey(args.get("pubkey").getAsString()); + cReq.setPublicKey(args.get("pubkey").getAsString()); byte[] sign = ByteUtils.fromHexString(args.get("signature").getAsString()); - c.setSignature(ByteUtils.toHexString(sign)); + cReq.setSignature(ByteUtils.toHexString(sign)); } - if (c.getPublicKey() != null) { - if (!c.verifySignature()) { - c.setPublicKey(null); - c.setRequester(null); + if (cReq.getPublicKey() != null) { + if (!cReq.verifySignature()) { + cReq.setPublicKey(null); + cReq.setRequester(null); } } String reqID; - if (args.has("requestID")) reqID = args.get("requestID").getAsString(); - else { + if (args.has("requestID")) { + reqID = args.get("requestID").getAsString(); + } else { reqID = System.currentTimeMillis() + "_" + (int) (Math.random() * 1000); args.addProperty("requestID", reqID); } - c.setRequestID(reqID); + cReq.setRequestID(reqID); long start = System.currentTimeMillis(); manager.executeContractInternal( - c, + cReq, new ResultCallback() { @Override public void onResult(JsonObject ret) { - ret.addProperty("responseID", c.getRequestID()); + ret.addProperty("responseID", cReq.getRequestID()); ret.addProperty("action", "onExecuteResult"); String costTime = (System.currentTimeMillis() - start) + ""; ret.addProperty("executeTime", costTime); ContractMeta meta = - manager.statusRecorder.getContractMeta(c.getContractID()); + manager.statusRecorder.getContractMeta(cReq.getContractID()); if (meta != null && meta.getIsDebug()) { FUNCINVOKEINFO.putOneInvoke( - c.getContractID(), - c.getAction(), - c.getRequestID(), - c.getArg(), + cReq.getContractID(), + cReq.getAction(), + cReq.getRequestID(), + cReq.getArg(), ret.has("result") ? ret.get("result").toString() : ""); } + LOGGER.debug("result of request " + cReq.getRequestID() + ": " + ret); resultCallback.onResult(ret.toString()); } diff --git a/src/main/java/org/bdware/server/action/MasterWSAction.java b/src/main/java/org/bdware/server/action/MasterWSAction.java index 5a053ed..b3387fa 100644 --- a/src/main/java/org/bdware/server/action/MasterWSAction.java +++ b/src/main/java/org/bdware/server/action/MasterWSAction.java @@ -303,7 +303,7 @@ public class MasterWSAction { + contract.getID() + "\"," + "\"action\":\"onStartTrustfulContract\"}"); - LOGGER.info("success!"); + LOGGER.info("startContractMultiPoint succeed!"); } } diff --git a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java index b2c8bcb..b0ca462 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java @@ -193,13 +193,13 @@ public class MasterClientTCPAction { Contract contract = JsonUtil.fromJson(jo.get("contractStr").getAsString(), Contract.class); String contractID = contract.getID(); // 获取contract type - LOGGER.info(contract.getType()); + LOGGER.debug(contract.getType()); if (contract.getType() == ContractExecType.Sharding) { // 每节点都是master,且MPCI中需要实例化出MultiPointCooperationExecutor, // 需要计算出自己的ShardingID?路由规则(id/requester/arg-->shardingId), // 也在MultiPointCooperationExecutor中实现 } - //TOODO master连接 + //TODO master连接 // contractID2MasterInfo.put(contractID, this); // 记录contractID 和 master之间的对应关系 MultiContractMeta cei = CMActions.manager.multiContractRecorder.createIfNotExist(contractID); @@ -252,12 +252,10 @@ public class MasterClientTCPAction { cei.setIsMaster(true); } - LOGGER.info("启动参数: " + JsonUtil.toJson(contract)); - - + LOGGER.debug("startup arguments: " + JsonUtil.toJson(contract)); String ret = CMActions.manager.startContract(contract); // 调用CMActions 里的启动合约的方法,启动结果 - LOGGER.info("启动结果为 " + ret); + LOGGER.debug("startup result: " + ret); CMActions.manager.multiContractRecorder.updateValue(cei); ContractMeta meta = CMActions.manager.statusRecorder.createIfNotExist(contractID); diff --git a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java index 91e9215..1f8dff7 100644 --- a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java +++ b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java @@ -17,10 +17,7 @@ import org.bdware.server.action.p2p.MasterServerRecoverMechAction; import org.bdware.units.NetworkManager; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.stream.Collectors; /** @@ -47,7 +44,10 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { 2, 2, TimeUnit.SECONDS); - ContractManager.threadPool.submit(() -> { + LOGGER.debug(String.format("ContractManager.threadPool=%d/%d", + ((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(), + ((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize())); + ContractManager.threadPool.execute(() -> { LOGGER.info( "[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running); while (running) { @@ -91,10 +91,16 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { ContractResult.Status.Executing, new JsonPrimitive("this request is adding into blocks")))); if (reqQueue.size() >= SUBMIT_LIMIT) { - submitBlock(); + ContractManager.threadPool.execute(this::submitBlock); } } + @Override + public void close() { + this.future.cancel(false); + this.running = false; + LOGGER.info("destruct executor of contract " + meta.getContractID()); + } public void execute(String blockStr) { Block block = JsonUtil.fromJson(blockStr, Block.class); @@ -128,7 +134,11 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { // TODO check status for (ContractRequest request : block.requests) { String ret = ContractManager.instance.executeLocally(request, null); - LOGGER.debug("result of request " + request.getRequestID() + ": " + ret); + LOGGER.debug(String.format( + "[SelfAdaptiveShardingExecutor %s] result of request %s: %s", + meta.getContractID(), + request.getRequestID(), + ret)); executedTxs.put(request.getRequestID(), true); } LOGGER.info(String.format( @@ -142,24 +152,23 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { } private void submitBlock() { - ContractManager.threadPool.execute(() -> { - Block block = fillBlock(); - if (null != block) { - LOGGER.debug(JsonUtil.toPrettyJson(block)); - String[] nodes = this.meta.getMembers(); - JsonObject req = new JsonObject(); - req.addProperty("action", "deliverBlock"); - req.addProperty("data", JsonUtil.toJson(block)); - req.addProperty("contractID", this.meta.getContractID()); - String reqStr = req.toString(); - for (String node : nodes) { - if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) - == RecoverFlag.Fine) { - NetworkManager.instance.sendToAgent(node, reqStr); - } + Block block = fillBlock(); + if (null != block) { + LOGGER.info("deliver block " + block.hash + "..."); + LOGGER.debug(JsonUtil.toPrettyJson(block)); + String[] nodes = this.meta.getMembers(); + JsonObject req = new JsonObject(); + req.addProperty("action", "deliverBlock"); + req.addProperty("data", JsonUtil.toJson(block)); + req.addProperty("contractID", this.meta.getContractID()); + String reqStr = req.toString(); + for (String node : nodes) { + if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) + == RecoverFlag.Fine) { + NetworkManager.instance.sendToAgent(node, reqStr); } } - }); + } } private synchronized Block fillBlock() {