diff --git a/build.gradle b/build.gradle index e7d172e..a7ed86b 100644 --- a/build.gradle +++ b/build.gradle @@ -137,6 +137,7 @@ task copyWebContent(type: Copy) { exclude 'client/README.md' exclude 'client/.idea/' exclude 'client/.gitignore' + exclude '.keep' } into "./build/output/WebContent" } @@ -167,9 +168,10 @@ task buildBDServerZip(type: Zip, dependsOn: [":agent-backend:copyWebContent", task buildBDServerZipLite(type: Zip, dependsOn: [":agent-backend:copyWebContent", ":agent-backend:copyBDWareProjectDir", ":agent-backend:copyScript", - ":agent-backend:copyJar", ":agent-backend:copyLibs", ":agent-backend:copyKeys"]) { + ":agent-backend:copyJar", ":agent-backend:copyLibs", + ":agent-backend:copyKeys"]) { from('./build/output/') { - exclude 'BDWareProjectDir/public/**' + exclude 'BDWareProjectDir/public/TF**/' exclude 'WebContent/doc/' } duplicatesStrategy = DuplicatesStrategy.INCLUDE @@ -177,9 +179,23 @@ task buildBDServerZipLite(type: Zip, dependsOn: [":agent-backend:copyWebContent" destinationDirectory = file('build/') } +task buildBDServerZipMin(type: Zip, dependsOn: [":agent-backend:copyWebContent", + ":agent-backend:copyBDWareProjectDir", ":agent-backend:copyScript", + ":agent-backend:copyJar", ":agent-backend:copyLibs", + ":agent-backend:copyKeys"]) { + from('./build/output/') { + exclude 'BDWareProjectDir/public/**' + exclude 'WebContent/doc/' + } + duplicatesStrategy = DuplicatesStrategy.INCLUDE + archiveFileName = 'bdserver-min.zip' + destinationDirectory = file('build/') +} + task buildBDServerZipUpdate(type: Zip, dependsOn: [":agent-backend:copyCP", ":agent-backend:copyWebContent", ":agent-backend:copyBDWareProjectDir", ":agent-backend:copyScript", - ":agent-backend:copyJar", ":agent-backend:copyLibs", ":agent-backend:copyKeys"]) { + ":agent-backend:copyJar", ":agent-backend:copyLibs", + ":agent-backend:copyKeys"]) { from('./build/output/') { include '*.jar' include 'libs/' diff --git a/src/main/java/org/bdware/server/action/CMActions.java b/src/main/java/org/bdware/server/action/CMActions.java index 927bc15..4fa1e46 100644 --- a/src/main/java/org/bdware/server/action/CMActions.java +++ b/src/main/java/org/bdware/server/action/CMActions.java @@ -117,15 +117,17 @@ public class CMActions implements OnHashCallback { final JsonObject args, final ResultCallback resultCallback, final OnHashCallback hashcb) { - final ContractRequest c = new ContractRequest(); - if (!args.has("contractName") && !args.has("contractID") && !args.has("contractDOI")) { + final ContractRequest cReq = new ContractRequest(); + if (!args.has("contractName") && + !args.has("contractID") && + !args.has("contractDOI")) { resultCallback.onResult(MISSING_ARGUMENT); return; } if (args.has("contractDOI") && !args.has("contractID")) { LOGGER.debug("contractDOI: " + args.get("contractDOI").getAsString()); try { - c.setContractDOI(args.get("contractDOI").getAsString()); + cReq.setContractDOI(args.get("contractDOI").getAsString()); } catch (Exception e) { e.printStackTrace(); resultCallback.onResult(INVALID_DOI); @@ -133,87 +135,91 @@ public class CMActions implements OnHashCallback { } } else { if (args.has("contractName")) { - c.setContractID(args.get("contractName").getAsString()); + cReq.setContractID(args.get("contractName").getAsString()); } if (args.has("contractID")) { - c.setContractID(args.get("contractID").getAsString()); + cReq.setContractID(args.get("contractID").getAsString()); } } - if (args.has("isDebug")) c.setFromDebug(args.get("isDebug").getAsBoolean()); + if (args.has("isDebug")) { + cReq.setFromDebug(args.get("isDebug").getAsBoolean()); + } if (args.has("withDynamicAnalysis")) - c.withDynamicAnalysis = args.get("withDynamicAnalysis").getAsBoolean(); + cReq.withDynamicAnalysis = args.get("withDynamicAnalysis").getAsBoolean(); if (args.has("withEvaluatesAnalysis")) - c.withEvaluatesAnalysis = args.get("withEvaluatesAnalysis").getAsBoolean(); + cReq.withEvaluatesAnalysis = args.get("withEvaluatesAnalysis").getAsBoolean(); if (!args.has("arg")) { resultCallback.onResult(MISSING_ARGUMENT); return; } if (args.has("operation")) { - c.setAction(args.get("operation").getAsString()); - c.setArg(args.get("arg").getAsString()); + cReq.setAction(args.get("operation").getAsString()); + cReq.setArg(args.get("arg").getAsString()); } else { JsonObject jo = JsonParser.parseString(args.get("arg").getAsString()).getAsJsonObject(); if (!jo.has("action") || !jo.has("arg")) { resultCallback.onResult(MISSING_ARGUMENT); return; } - c.setAction(jo.get("action").getAsString()); - c.setArg(jo.get("arg").getAsString()); - if (c.withEvaluatesAnalysis) { - c.setValue(jo.get("hasValue").getAsLong()); + cReq.setAction(jo.get("action").getAsString()); + cReq.setArg(jo.get("arg").getAsString()); + if (cReq.withEvaluatesAnalysis) { + cReq.setValue(jo.get("hasValue").getAsLong()); } } - if (args.has("gasLimit")) c.setGasLimit(args.get("gasLimit").getAsLong()); - + if (args.has("gasLimit")) { + cReq.setGasLimit(args.get("gasLimit").getAsLong()); + } if (args.has("requester")) { - c.setPublicKey(args.get("requester").getAsString()); + cReq.setPublicKey(args.get("requester").getAsString()); byte[] sign = ByteUtils.fromHexString(args.get("signature").getAsString()); - c.setSignature(ByteUtils.toHexString(sign)); + cReq.setSignature(ByteUtils.toHexString(sign)); } if (args.has("requesterDOI")) { - c.setRequesterDOI(args.get("requesterDOI").getAsString()); + cReq.setRequesterDOI(args.get("requesterDOI").getAsString()); } else { - c.setRequesterDOI("empty"); + cReq.setRequesterDOI("empty"); } if (args.has("pubkey")) { - c.setPublicKey(args.get("pubkey").getAsString()); + cReq.setPublicKey(args.get("pubkey").getAsString()); byte[] sign = ByteUtils.fromHexString(args.get("signature").getAsString()); - c.setSignature(ByteUtils.toHexString(sign)); + cReq.setSignature(ByteUtils.toHexString(sign)); } - if (c.getPublicKey() != null) { - if (!c.verifySignature()) { - c.setPublicKey(null); - c.setRequester(null); + if (cReq.getPublicKey() != null) { + if (!cReq.verifySignature()) { + cReq.setPublicKey(null); + cReq.setRequester(null); } } String reqID; - if (args.has("requestID")) reqID = args.get("requestID").getAsString(); - else { + if (args.has("requestID")) { + reqID = args.get("requestID").getAsString(); + } else { reqID = System.currentTimeMillis() + "_" + (int) (Math.random() * 1000); args.addProperty("requestID", reqID); } - c.setRequestID(reqID); + cReq.setRequestID(reqID); long start = System.currentTimeMillis(); manager.executeContractInternal( - c, + cReq, new ResultCallback() { @Override public void onResult(JsonObject ret) { - ret.addProperty("responseID", c.getRequestID()); + ret.addProperty("responseID", cReq.getRequestID()); ret.addProperty("action", "onExecuteResult"); String costTime = (System.currentTimeMillis() - start) + ""; ret.addProperty("executeTime", costTime); ContractMeta meta = - manager.statusRecorder.getContractMeta(c.getContractID()); + manager.statusRecorder.getContractMeta(cReq.getContractID()); if (meta != null && meta.getIsDebug()) { FUNCINVOKEINFO.putOneInvoke( - c.getContractID(), - c.getAction(), - c.getRequestID(), - c.getArg(), + cReq.getContractID(), + cReq.getAction(), + cReq.getRequestID(), + cReq.getArg(), ret.has("result") ? ret.get("result").toString() : ""); } - System.out.println(ret.toString()); + System.out.println(ret); resultCallback.onResult(ret.toString()); } @@ -1159,17 +1165,17 @@ public class CMActions implements OnHashCallback { public void connectTo(JsonObject args, ResultCallback resultCallback) { String data; if (!args.has("id")) { - ReplyUtil.simpleReply(resultCallback,"onConnectTo","missing contract id"); + ReplyUtil.simpleReply(resultCallback, "onConnectTo", "missing contract id"); return; } String contractID = args.get("id").getAsString(); LOGGER.info("connectTo:" + contractID); if (contractID == null) { - ReplyUtil.simpleReply(resultCallback,"onConnectTo","can't find contract id"); + ReplyUtil.simpleReply(resultCallback, "onConnectTo", "can't find contract id"); return; } manager.redirect(contractID, createPS(), ""); - ReplyUtil.simpleReply(resultCallback,"onConnectTo","success"); + ReplyUtil.simpleReply(resultCallback, "onConnectTo", "success"); } private PrintStream createPS() { @@ -1244,7 +1250,7 @@ public class CMActions implements OnHashCallback { ExecutionManager.instance.updateLocalContractToNodeCenter(); } } else { - ReplyUtil.simpleReply(resultCallback,"onKillContractProcess","Failed: Illegal parameters"); + ReplyUtil.simpleReply(resultCallback, "onKillContractProcess", "Failed: Illegal parameters"); } } @@ -1504,14 +1510,14 @@ public class CMActions implements OnHashCallback { e.printStackTrace(); } ExecutionManager.instance.updateLocalContractToNodeCenter(); - ReplyUtil.simpleReply(resultCallback,"onKillAllContract",sb.toString()); + ReplyUtil.simpleReply(resultCallback, "onKillAllContract", sb.toString()); manager.stopAllContracts(); } else { manager.stopAllContractsWithOwner(args.get(("verifiedPubKey")).getAsString()); - ReplyUtil.simpleReply(resultCallback,"onKillAllContract","Success"); + ReplyUtil.simpleReply(resultCallback, "onKillAllContract", "Success"); } } else { - ReplyUtil.simpleReply(resultCallback,"onKillAllContract","Failed: Illegal user"); + ReplyUtil.simpleReply(resultCallback, "onKillAllContract", "Failed: Illegal user"); } } diff --git a/src/main/java/org/bdware/server/action/MasterWSAction.java b/src/main/java/org/bdware/server/action/MasterWSAction.java index a4ab0ac..3deeae1 100644 --- a/src/main/java/org/bdware/server/action/MasterWSAction.java +++ b/src/main/java/org/bdware/server/action/MasterWSAction.java @@ -19,8 +19,8 @@ import org.bdware.server.GlobalConf; import org.bdware.server.action.p2p.MasterServerRecoverMechAction; import org.bdware.server.action.p2p.MasterServerTCPAction; import org.bdware.server.trustedmodel.MultiPointContractInfo; -import org.bdware.server.trustedmodel.ResultCollector; import org.bdware.server.trustedmodel.MultiPointCooperateContractInfo; +import org.bdware.server.trustedmodel.ResultCollector; import org.bdware.server.trustedmodel.SlaveNode; import org.bdware.units.NetworkManager; import org.zz.gmhelper.SM2KeyPair; @@ -140,14 +140,27 @@ public class MasterWSAction { } */ List nodeNames; // nodes' peerID - // all nodes' peerID in the unit - String[] nodeNamesStr = - args.get("peersID").getAsString().split(","); - // record information of these nodes - nodeNames = - Arrays.stream(nodeNamesStr) - .filter(x -> null != x && !x.isEmpty()) - .collect(Collectors.toList()); + if (contract.getType().equals(ContractExecType.SelfAdaptiveSharding)) { + if (ContractManager.instance.nodeCenterConn.listNodes().length < 3) { + rc.onResult( + "{\"status\":\"Error\",\"result\":\"No enough nodes!\"," + + "\"action\":\"onStartTrustfulContract\"}"); + return; + } + int unitSize = args.get("selectUnitNum").getAsInt(); + nodeNames = + Arrays.asList(ContractManager.instance.nodeCenterConn.getClusterByKey( + contractID, + Math.max(unitSize, 3))); + } else {// all nodes' peerID in the unit + String[] nodeNamesStr = + args.get("peersID").getAsString().split(","); + // record information of these nodes + nodeNames = + Arrays.stream(nodeNamesStr) + .filter(x -> null != x && !x.isEmpty()) + .collect(Collectors.toList()); + } int nodeSize = nodeNames.size(); // 方式一向NodeCenter发,要求Slave节点主动连接到Master节点. @@ -229,6 +242,7 @@ public class MasterWSAction { case RequestAllResponseHalf: case RequestAllResponseAll: case Sharding: + case SelfAdaptiveSharding: contract.setNumOfCopies(nodeSize); break; default: @@ -274,7 +288,7 @@ public class MasterWSAction { + contract.getID() + "\"," + "\"action\":\"onStartTrustfulContract\"}"); - LOGGER.info("启动多点合约完成"); + LOGGER.info("success!"); } private boolean waitForConnection(List nodeNames) { diff --git a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java index 0457acf..6045ad9 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java @@ -154,6 +154,9 @@ public class MasterClientTCPAction { case Sharding: executor = new MultiPointCooperationExecutor(ContractExecType.Sharding, nodeSize, contractID); break; + case SelfAdaptiveSharding: + executor = new SelfAdaptiveShardingExecutor(contractID); + break; } return executor; } @@ -234,7 +237,8 @@ public class MasterClientTCPAction { .format(lastMasterPongTime) + " 认为master崩溃!"); - // 向NC发通知重新选举master,如果NC没有收到所有节点的重选请求,就认为是这个节点和master连接断开,这个节点需要重连master + // 向NC发通知重新选举master,如果NC没有收到所有节点的重选请求,就认为是这个节点和master连接断开 + // 这个节点需要重连master Map request = new HashMap<>(); request.put("action", "electMaster"); SM2KeyPair keyPair = GlobalConf.instance.keyPair; @@ -405,11 +409,11 @@ public class MasterClientTCPAction { File temp = new File(parPath, pp[pp.length - 1]); if (!temp.exists()) { result.onResult( - "{\"action\":\"onStartContractTrustfully\",\"result\":\"missing contract files\",\"requestID\":\"" - + jo.get("requestID").getAsString() - + "\",\"pubKey\":\"" - + GlobalConf.instance.keyPair.getPublicKeyStr() - + "\"}"); + String.format( + "{\"action\":\"onStartContractTrustfully\",\"result\":\"missing contract files\"," + + "\"requestID\":\"%s\",\"pubKey\":\"%s\"}", + jo.get("requestID").getAsString(), + GlobalConf.instance.keyPair.getPublicKeyStr())); return; } contract.setScript(temp.getAbsolutePath()); @@ -420,7 +424,9 @@ public class MasterClientTCPAction { cei.setMaster(jo.get("master").getAsString()); if (contract.getType() != ContractExecType.Sharding) cei.setIsMaster(GlobalConf.getNodeID().equals(jo.get("master").getAsString())); - else cei.setIsMaster(true); + else { + cei.setIsMaster(true); + } LOGGER.info("启动参数: " + JsonUtil.toJson(contract)); cei.setContractExecutor(createContractExecutor(contract, contractID)); // 分配不同的Executor @@ -485,6 +491,17 @@ public class MasterClientTCPAction { return "/" + scriptName + ".ypk"; } + @Action(async = true) + public void deliverBlock(JsonObject jo, ResultCallback result) { + if (jo.has("contractID") && jo.has("data")) { + String contractID = jo.get("contractID").getAsString(); + MultiContractMeta cei = ContractManager.instance.multiContractRecorder.getMultiContractMeta(contractID); + if (null != cei && cei.contractExecutor instanceof SelfAdaptiveShardingExecutor) { + ((SelfAdaptiveShardingExecutor) cei.contractExecutor).execute(jo.get("data").getAsString()); + } + } + } + @Action(async = true) public void executeContractLocally(JsonObject jo, ResultCallback result) { final ContractRequest request = @@ -533,8 +550,8 @@ public class MasterClientTCPAction { MultiRequestInfo mri = MultiRequestInfo.reqInfos.get(request.getRequestID()); - if (request.seq - == cei.curExeSeq) { // 正在执行多点请求时收到多点请求,说明这个请求不是第一个到的同requestID的请求s + if (request.seq == cei.curExeSeq) { + // 正在执行多点请求时收到多点请求,说明这个请求不是第一个到的同requestID的请求s mri.callbackMap.put(jo.get("uniReqID").getAsString(), result); mri.putFlag(jo.get("uniReqID").getAsString(), false); } else if (request.seq > cei.curExeSeq diff --git a/src/main/java/org/bdware/server/trustedmodel/MasterProxy.java b/src/main/java/org/bdware/server/trustedmodel/MasterProxy.java index 1d27ba1..a42f271 100644 --- a/src/main/java/org/bdware/server/trustedmodel/MasterProxy.java +++ b/src/main/java/org/bdware/server/trustedmodel/MasterProxy.java @@ -78,9 +78,9 @@ public class MasterProxy implements MasterStub { new ResultCallback() { @Override public void onResult(String ret) { - JsonObject result =JsonUtil.parseString(ret); - ContractManager.instance.extractEventsFromContractResult( - null, result, client, request, start); + JsonObject result = JsonUtil.parseString(ret); + ContractManager.instance.extractEventsFromContractResult( + null, result, client, request, start); LOGGER.debug( new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") .format(new Date(System.currentTimeMillis())) diff --git a/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java new file mode 100644 index 0000000..ca82728 --- /dev/null +++ b/src/main/java/org/bdware/server/trustedmodel/SelfAdaptiveShardingExecutor.java @@ -0,0 +1,180 @@ +package org.bdware.server.trustedmodel; + +import com.google.gson.JsonObject; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.sc.ContractManager; +import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.units.MultiContractMeta; +import org.bdware.sc.units.RecoverFlag; +import org.bdware.sc.util.HashUtil; +import org.bdware.sc.util.JsonUtil; +import org.bdware.server.action.p2p.MasterServerRecoverMechAction; +import org.bdware.server.action.p2p.MasterServerTCPAction; + +import java.util.Arrays; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * @author Kaidong Wu + */ +public class SelfAdaptiveShardingExecutor implements ContractExecutor { + private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class); + private static final int SUBMIT_LIMIT = 1024; + private final Queue reqQueue = new ConcurrentLinkedQueue<>(); + private final MultiContractMeta meta; + private final Map toExecuted = new ConcurrentHashMap<>(); + private final Set executedBlocks = ConcurrentHashMap.newKeySet(); + private final Map executedTxs = new ConcurrentHashMap<>(); + private final Object flag = new Object(); + private final ScheduledFuture future; + private boolean running = true; + private Block b = new Block(); + + public SelfAdaptiveShardingExecutor(String contractID) { + this.meta = + ContractManager.instance.multiContractRecorder.getMultiContractMeta(contractID); + this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay( + this::submitBlock, + 2, + 2, + TimeUnit.SECONDS); + ContractManager.threadPool.submit(() -> { + while (running) { + while (!toExecuted.isEmpty()) { + String key = this.b.prevHash; + Block block = toExecuted.get(key); + if (null != block) { + executeBlock(block); + } + toExecuted.remove(key); + } + synchronized (flag) { + try { + flag.wait(); + } catch (InterruptedException e) { + LOGGER.warn(String.format( + "[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s", + meta.getContractID(), + e.getMessage())); + } + } + } + }); + } + + public void close() { + this.future.cancel(false); + this.running = false; + } + + @Override + public void execute(String requestID, ResultCallback rc, ContractRequest req) { + if (executedTxs.containsKey(requestID)) { + return; + } + executedTxs.put(requestID, false); + reqQueue.add(req); + if (reqQueue.size() >= SUBMIT_LIMIT) { + submitBlock(); + } + } + + public void execute(String blockStr) { + Block block = JsonUtil.fromJson(blockStr, Block.class); + LOGGER.info(String.format( + "[SelfAdaptiveShardingExecutor %s] receive block %s -> %s, %d transactions, timestamp %d", + meta.getContractID(), + block.hash, + block.prevHash, + block.requests.length, + block.timestamp)); + if (!executedBlocks.contains(block.hash)) { + toExecuted.put(block.prevHash, block); + synchronized (flag) { + flag.notify(); + } + } + } + + private synchronized void executeBlock(Block block) { + for (ContractRequest request : block.requests) { + if (executedTxs.containsKey(request.getContractID()) && executedTxs.get(request.getContractID())) { + return; + } + } + for (ContractRequest request : block.requests) { + ContractManager.instance.executeLocally(request, null); + executedTxs.put(request.getContractID(), true); + } + this.b = new Block(block.hash); + executedBlocks.add(block.hash); + } + + private void submitBlock() { + ContractManager.threadPool.execute(() -> { + Block block = fillBlock(); + if (null != block) { + String[] nodes = this.meta.getMembers(); + JsonObject req = new JsonObject(); + req.addProperty("action", "deliverBlock"); + req.addProperty("data", JsonUtil.toJson(block)); + req.addProperty("contractID", this.meta.getContractID()); + String reqStr = req.toString(); + for (String node : nodes) { + SlaveNode cmNode = MasterServerTCPAction.id2Slaves.get(node); + if (cmNode != null && + MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) + == RecoverFlag.Fine) { + cmNode.connection.sendMsg(reqStr); + } + } + } + }); + } + + private synchronized Block fillBlock() { + ContractRequest[] requests = new ContractRequest[Math.min(reqQueue.size(), SUBMIT_LIMIT)]; + if (requests.length == 0) { + return null; + } + for (int i = 0; i < requests.length; ++i) { + requests[i] = reqQueue.poll(); + } + this.b.fillBlock(null, requests); + return this.b; + } + + static class Block { + String prevHash = "0"; + String hash; + String checkPoint; + ContractRequest[] requests; + long timestamp; + + public Block() { + } + + public Block(String prev) { + this.prevHash = prev; + } + + public void fillBlock(String cp, ContractRequest[] requests) { + this.checkPoint = cp; + this.requests = requests; + this.timestamp = System.currentTimeMillis(); + hash = HashUtil.sha3( + prevHash, + cp, + Arrays.stream(requests).map(ContractRequest::getRequestID).collect(Collectors.joining())); + } + } +}