From 60296666039c6b8d408cac10a30f336d28ad95cd Mon Sep 17 00:00:00 2001 From: "Frank.R.Wu" Date: Mon, 22 Nov 2021 14:38:28 +0800 Subject: [PATCH] merge --- .../server/tcp/TCPClientFrameHandler.java | 4 +- .../SelfAdaptiveShardingExecutor.java | 54 ++++++++++++++++--- src/main/resources/log4j2.properties | 2 +- 3 files changed, 51 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java b/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java index 97e579b..ceff348 100644 --- a/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java +++ b/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java @@ -171,8 +171,8 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler { try { JsonObject jo = JsonUtil.parseStringAsJsonObject(json); } catch (Exception e) { - e.printStackTrace(); - System.out.println("============[MasterClientFrameHandler]JsonParse Error:" + json); + LOGGER.warn("JsonParse Error! " + e.getMessage()); + LOGGER.debug("JsonParse Error! " + json); } ByteBuf buf = Unpooled.wrappedBuffer(json.getBytes()); ctx.channel().writeAndFlush(buf); diff --git a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java index 30a6d1e..eb31fde 100644 --- a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java +++ b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java @@ -4,15 +4,18 @@ import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.bdware.sc.ContractClient; import org.bdware.sc.ContractManager; import org.bdware.sc.ContractResult; import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.bean.FunctionDesp; import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.units.RecoverFlag; import org.bdware.sc.util.HashUtil; import org.bdware.sc.util.JsonUtil; +import org.bdware.server.action.CMActions; import org.bdware.server.action.p2p.MasterServerRecoverMechAction; import org.bdware.units.NetworkManager; @@ -38,7 +41,7 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { public SelfAdaptiveShardingExecutor(String contractID) { this.meta = - ContractManager.instance.multiContractRecorder.getMultiContractMeta(contractID); + CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay( this::submitBlock, 2, @@ -77,19 +80,48 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { } @Override - public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hrc) { + public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) { + // check client + ContractClient client = CMActions.manager.getClient(meta.getContractID()); + if (null == client) { + LOGGER.error("contract " + meta.getContractID() + " not found!"); + rcb.onResult(JsonUtil.toJson(new ContractResult( + ContractResult.Status.Error, + new JsonPrimitive("contract " + meta.getContractID() + " not found!")))); + return; + } + // check function + FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction()); + if (null == funDesp) { + LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!"); + rcb.onResult(JsonUtil.toJson(new ContractResult( + ContractResult.Status.Error, + new JsonPrimitive( + String.format("action %s of contract %s not found!", + req.getAction(), + meta.getContractID()))))); + return; + } + // for view function, execute it + if (funDesp.isView) { + CMActions.manager.executeLocallyAsync(req, rcb, hcb); + return; + } + // normal function, check if it is in blocks if (executedTxs.containsKey(requestID)) { - rc.onResult(JsonUtil.toJson(new ContractResult( + rcb.onResult(JsonUtil.toJson(new ContractResult( ContractResult.Status.Error, new JsonPrimitive("this request has been packed!")))); return; } + // add blocks into request cache LOGGER.debug("receive contract request " + requestID); executedTxs.put(requestID, false); reqQueue.add(req); - rc.onResult(JsonUtil.toJson(new ContractResult( + rcb.onResult(JsonUtil.toJson(new ContractResult( ContractResult.Status.Executing, new JsonPrimitive("this request is adding into blocks")))); + // if cache is full, submit if (reqQueue.size() >= SUBMIT_LIMIT) { ContractManager.threadPool.execute(this::submitBlock); } @@ -97,16 +129,19 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { @Override public void close() { - this.future.cancel(false); + // stop threads + this.future.cancel(true); this.running = false; LOGGER.info("destruct executor of contract " + meta.getContractID()); } public void execute(String blockStr) { Block block = JsonUtil.fromJson(blockStr, Block.class); + // the block must have not been cached or executed, and must be valid if (!toExecuted.containsKey(block.prevHash) && !executedBlocks.contains(block.hash) && block.isValid()) { + // add block into block cache LOGGER.info(String.format( "[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," + " %d transactions, timestamp=%d, size=%d", @@ -117,6 +152,7 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { block.timestamp, blockStr.length())); toExecuted.put(block.prevHash, block); + // notify thread to execute blocks synchronized (flag) { flag.notify(); } @@ -124,7 +160,9 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { } private synchronized void executeBlock(Block block) { + // used for the thread to execute blocks LOGGER.debug("start"); + // check contract requests, requests must have not been executed for (ContractRequest request : block.requests) { if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) { LOGGER.debug("find request " + request.getRequestID() + " has been executed!"); @@ -132,8 +170,9 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { } } // TODO check status + // executed requests for (ContractRequest request : block.requests) { - String ret = ContractManager.instance.executeLocally(request, null); + String ret = CMActions.manager.executeLocally(request, null); LOGGER.debug(String.format( "[SelfAdaptiveShardingExecutor %s] result of request %s: %s", meta.getContractID(), @@ -162,6 +201,7 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { req.addProperty("data", JsonUtil.toJson(block)); req.addProperty("contractID", this.meta.getContractID()); String reqStr = req.toString(); + // deliver blocks for (String node : nodes) { if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) == RecoverFlag.Fine) { @@ -172,6 +212,7 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { } private synchronized Block fillBlock() { + // pack contract requests into a block ContractRequest[] requests = new ContractRequest[Math.min(reqQueue.size(), SUBMIT_LIMIT)]; if (requests.length == 0) { return null; @@ -217,6 +258,7 @@ public class SelfAdaptiveShardingExecutor implements ContractExecutor { } private String merkle(ContractRequest[] requests) { + // manage requests as a merkle tree if (requests.length == 0) { return null; } diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties index f3c8a11..3c01bc8 100644 --- a/src/main/resources/log4j2.properties +++ b/src/main/resources/log4j2.properties @@ -10,6 +10,6 @@ appender.rolling.append=true appender.rolling.fileName=./log/cm.log appender.rolling.layout.type=PatternLayout appender.rolling.layout.pattern=%d-%m%n -rootLogger.level=debug +rootLogger.level=info rootLogger.appenderRef.stdout.ref=STDOUT rootLogger.appenderRef.log.ref=log \ No newline at end of file