From 098c167a9534afcecb59d5aea8836df04b3b10aa Mon Sep 17 00:00:00 2001 From: CaiHQ Date: Thu, 18 Nov 2021 19:45:52 +0800 Subject: [PATCH] merge feature-dengshuang step1. prune ContractExecutor --- CommitAlgorithm.README.md | 4 + README.md | 2 +- .../org/bdware/server/action/CMActions.java | 103 ++++++++---------- .../action/p2p/MasterClientTCPAction.java | 20 +++- .../p2p/MasterServerRecoverMechAction.java | 6 +- .../action/p2p/MasterServerTCPAction.java | 70 ++++++------ .../RequestAllExecutor.java | 19 +++- .../MultiPointCooperationExecutor.java | 9 +- .../unconsistency}/RequestOnceExecutor.java | 9 +- .../unconsistency}/ResponseOnceExecutor.java | 7 +- .../_UNUSED_RouteEnabledExecutor.java} | 13 ++- .../server/tcp/TCPClientFrameHandler.java | 4 +- .../server/trustedmodel/MasterProxy.java | 36 ------ .../java/org/bdware/units/NetworkManager.java | 6 +- src/main/resources/log4j.properties | 7 -- src/main/resources/log4j2.properties | 2 +- 16 files changed, 151 insertions(+), 166 deletions(-) create mode 100644 CommitAlgorithm.README.md rename src/main/java/org/bdware/server/{trustedmodel => executor}/RequestAllExecutor.java (94%) rename src/main/java/org/bdware/server/{trustedmodel => executor/unconsistency}/MultiPointCooperationExecutor.java (97%) rename src/main/java/org/bdware/server/{trustedmodel => executor/unconsistency}/RequestOnceExecutor.java (89%) rename src/main/java/org/bdware/server/{trustedmodel => executor/unconsistency}/ResponseOnceExecutor.java (92%) rename src/main/java/org/bdware/server/{trustedmodel/RouteEnabledExecutor.java => executor/unconsistency/_UNUSED_RouteEnabledExecutor.java} (88%) delete mode 100644 src/main/resources/log4j.properties diff --git a/CommitAlgorithm.README.md b/CommitAlgorithm.README.md new file mode 100644 index 0000000..470ab84 --- /dev/null +++ b/CommitAlgorithm.README.md @@ -0,0 +1,4 @@ +# 相关类 +ContractExecutor +sendRequest给其他节点(MasterClientTCPAction."executeContractLocally") +receiveTrustfullyResult \ No newline at end of file diff --git a/README.md b/README.md index 5047d15..7d96d3b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -#项说明 +#项目说明 在每个bundle git clone后 都执行 diff --git a/src/main/java/org/bdware/server/action/CMActions.java b/src/main/java/org/bdware/server/action/CMActions.java index 664b14e..66159bd 100644 --- a/src/main/java/org/bdware/server/action/CMActions.java +++ b/src/main/java/org/bdware/server/action/CMActions.java @@ -34,9 +34,9 @@ import java.util.*; public class CMActions implements OnHashCallback { private static final String PARAM_ACTION = "action"; - private static final JsonObject MISSING_ARGUMENT = JsonUtil.parseStringAsJsonObject("{\"action\":\"onExecuteResult\",\"executeTime\":-1," + private static final JsonObject MISSING_ARGUMENT = JsonUtil.parseString("{\"action\":\"onExecuteResult\",\"executeTime\":-1," + "\"status\":\"Error\",\"result\":\"missing arguments\"}"); - private static final JsonObject INVALID_DOI = JsonUtil.parseStringAsJsonObject( + private static final JsonObject INVALID_DOI = JsonUtil.parseString( "{\"action\":\"onExecuteResult\",\"executeTime\":-1," + "\"status\":\"Error\",\"result\":\"invalid contract doi\"}"); private static final Logger LOGGER = LogManager.getLogger(CMActions.class); @@ -117,17 +117,15 @@ public class CMActions implements OnHashCallback { final JsonObject args, final ResultCallback resultCallback, final OnHashCallback hashcb) { - final ContractRequest cReq = new ContractRequest(); - if (!args.has("contractName") && - !args.has("contractID") && - !args.has("contractDOI")) { + final ContractRequest c = new ContractRequest(); + if (!args.has("contractName") && !args.has("contractID") && !args.has("contractDOI")) { resultCallback.onResult(MISSING_ARGUMENT); return; } if (args.has("contractDOI") && !args.has("contractID")) { LOGGER.debug("contractDOI: " + args.get("contractDOI").getAsString()); try { - cReq.setContractDOI(args.get("contractDOI").getAsString()); + c.setContractDOI(args.get("contractDOI").getAsString()); } catch (Exception e) { e.printStackTrace(); resultCallback.onResult(INVALID_DOI); @@ -135,91 +133,86 @@ public class CMActions implements OnHashCallback { } } else { if (args.has("contractName")) { - cReq.setContractID(args.get("contractName").getAsString()); + c.setContractID(args.get("contractName").getAsString()); } if (args.has("contractID")) { - cReq.setContractID(args.get("contractID").getAsString()); + c.setContractID(args.get("contractID").getAsString()); } } - if (args.has("isDebug")) { - cReq.setFromDebug(args.get("isDebug").getAsBoolean()); - } + if (args.has("isDebug")) c.setFromDebug(args.get("isDebug").getAsBoolean()); if (args.has("withDynamicAnalysis")) - cReq.withDynamicAnalysis = args.get("withDynamicAnalysis").getAsBoolean(); + c.withDynamicAnalysis = args.get("withDynamicAnalysis").getAsBoolean(); if (args.has("withEvaluatesAnalysis")) - cReq.withEvaluatesAnalysis = args.get("withEvaluatesAnalysis").getAsBoolean(); + c.withEvaluatesAnalysis = args.get("withEvaluatesAnalysis").getAsBoolean(); if (!args.has("arg")) { resultCallback.onResult(MISSING_ARGUMENT); return; } if (args.has("operation")) { - cReq.setAction(args.get("operation").getAsString()); - cReq.setArg(args.get("arg").getAsString()); + c.setAction(args.get("operation").getAsString()); + c.setArg(args.get("arg").getAsString()); } else { JsonObject jo = JsonParser.parseString(args.get("arg").getAsString()).getAsJsonObject(); if (!jo.has("action") || !jo.has("arg")) { resultCallback.onResult(MISSING_ARGUMENT); return; } - cReq.setAction(jo.get("action").getAsString()); - cReq.setArg(jo.get("arg").getAsString()); - if (cReq.withEvaluatesAnalysis) { - cReq.setValue(jo.get("hasValue").getAsLong()); + c.setAction(jo.get("action").getAsString()); + c.setArg(jo.get("arg").getAsString()); + if (c.withEvaluatesAnalysis) { + c.setValue(jo.get("hasValue").getAsLong()); } } - if (args.has("gasLimit")) { - cReq.setGasLimit(args.get("gasLimit").getAsLong()); - } + if (args.has("gasLimit")) c.setGasLimit(args.get("gasLimit").getAsLong()); + if (args.has("requester")) { - cReq.setPublicKey(args.get("requester").getAsString()); + c.setPublicKey(args.get("requester").getAsString()); byte[] sign = ByteUtils.fromHexString(args.get("signature").getAsString()); - cReq.setSignature(ByteUtils.toHexString(sign)); + c.setSignature(ByteUtils.toHexString(sign)); } if (args.has("requesterDOI")) { - cReq.setRequesterDOI(args.get("requesterDOI").getAsString()); + c.setRequesterDOI(args.get("requesterDOI").getAsString()); } else { - cReq.setRequesterDOI("empty"); + c.setRequesterDOI("empty"); } if (args.has("pubkey")) { - cReq.setPublicKey(args.get("pubkey").getAsString()); + c.setPublicKey(args.get("pubkey").getAsString()); byte[] sign = ByteUtils.fromHexString(args.get("signature").getAsString()); - cReq.setSignature(ByteUtils.toHexString(sign)); + c.setSignature(ByteUtils.toHexString(sign)); } - if (cReq.getPublicKey() != null) { - if (!cReq.verifySignature()) { - cReq.setPublicKey(null); - cReq.setRequester(null); + if (c.getPublicKey() != null) { + if (!c.verifySignature()) { + c.setPublicKey(null); + c.setRequester(null); } } String reqID; - if (args.has("requestID")) { - reqID = args.get("requestID").getAsString(); - } else { + if (args.has("requestID")) reqID = args.get("requestID").getAsString(); + else { reqID = System.currentTimeMillis() + "_" + (int) (Math.random() * 1000); args.addProperty("requestID", reqID); } - cReq.setRequestID(reqID); + c.setRequestID(reqID); long start = System.currentTimeMillis(); manager.executeContractInternal( - cReq, + c, new ResultCallback() { @Override public void onResult(JsonObject ret) { - ret.addProperty("responseID", cReq.getRequestID()); + ret.addProperty("responseID", c.getRequestID()); ret.addProperty("action", "onExecuteResult"); String costTime = (System.currentTimeMillis() - start) + ""; ret.addProperty("executeTime", costTime); ContractMeta meta = - manager.statusRecorder.getContractMeta(cReq.getContractID()); + manager.statusRecorder.getContractMeta(c.getContractID()); if (meta != null && meta.getIsDebug()) { FUNCINVOKEINFO.putOneInvoke( - cReq.getContractID(), - cReq.getAction(), - cReq.getRequestID(), - cReq.getArg(), + c.getContractID(), + c.getAction(), + c.getRequestID(), + c.getArg(), ret.has("result") ? ret.get("result").toString() : ""); } - System.out.println(ret); resultCallback.onResult(ret.toString()); } @@ -247,7 +240,7 @@ public class CMActions implements OnHashCallback { } // JsonObject jo = - // JsonParser.parseStringAsJsonObject(args.get("arg").getAsString()).getAsJsonObject(); + // JsonParser.parseString(args.get("arg").getAsString()).getAsJsonObject(); // if (!jo.has("action")) { // resultCallback.onResult(MISSING_ARGUMENT); // return; @@ -475,7 +468,7 @@ public class CMActions implements OnHashCallback { String data = "failed"; String contractID = ""; String operation = ""; - //JsonElement mask = JsonParser.parseStringAsJsonObject(""); + //JsonElement mask = JsonParser.parseString(""); if (args.has("contractID") && args.has("operation") && args.has("arg")) { contractID = args.get("contractID").getAsString(); System.out.println(contractID); @@ -1165,17 +1158,17 @@ public class CMActions implements OnHashCallback { public void connectTo(JsonObject args, ResultCallback resultCallback) { String data; if (!args.has("id")) { - ReplyUtil.simpleReply(resultCallback, "onConnectTo", "missing contract id"); + ReplyUtil.simpleReply(resultCallback,"onConnectTo","missing contract id"); return; } String contractID = args.get("id").getAsString(); LOGGER.info("connectTo:" + contractID); if (contractID == null) { - ReplyUtil.simpleReply(resultCallback, "onConnectTo", "can't find contract id"); + ReplyUtil.simpleReply(resultCallback,"onConnectTo","can't find contract id"); return; } manager.redirect(contractID, createPS(), ""); - ReplyUtil.simpleReply(resultCallback, "onConnectTo", "success"); + ReplyUtil.simpleReply(resultCallback,"onConnectTo","success"); } private PrintStream createPS() { @@ -1250,7 +1243,7 @@ public class CMActions implements OnHashCallback { ExecutionManager.instance.updateLocalContractToNodeCenter(); } } else { - ReplyUtil.simpleReply(resultCallback, "onKillContractProcess", "Failed: Illegal parameters"); + ReplyUtil.simpleReply(resultCallback,"onKillContractProcess","Failed: Illegal parameters"); } } @@ -1510,14 +1503,14 @@ public class CMActions implements OnHashCallback { e.printStackTrace(); } ExecutionManager.instance.updateLocalContractToNodeCenter(); - ReplyUtil.simpleReply(resultCallback, "onKillAllContract", sb.toString()); + ReplyUtil.simpleReply(resultCallback,"onKillAllContract",sb.toString()); manager.stopAllContracts(); } else { manager.stopAllContractsWithOwner(args.get(("verifiedPubKey")).getAsString()); - ReplyUtil.simpleReply(resultCallback, "onKillAllContract", "Success"); + ReplyUtil.simpleReply(resultCallback,"onKillAllContract","Success"); } } else { - ReplyUtil.simpleReply(resultCallback, "onKillAllContract", "Failed: Illegal user"); + ReplyUtil.simpleReply(resultCallback,"onKillAllContract","Failed: Illegal user"); } } @@ -1852,7 +1845,7 @@ public class CMActions implements OnHashCallback { LOGGER.debug("startContractConfig"); // TODO private contract // if (args.has("isPrivate") && args.get("isPrivate").getAsBoolean()) { - // args.add("pubkey", JsonParser.parseStringAsJsonObject(handler.getPubKey())); + // args.add("pubkey", JsonParser.parseString(handler.getPubKey())); // } boolean type = args.get("type").getAsString().equalsIgnoreCase("TCP"); String contract = args.get("contractName").getAsString(); diff --git a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java index f54aa3e..b72a2d2 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java @@ -17,8 +17,15 @@ import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.util.JsonUtil; import org.bdware.server.GlobalConf; import org.bdware.server.action.*; +import org.bdware.server.executor.RequestAllExecutor; +import org.bdware.server.executor.unconsistency.MultiPointCooperationExecutor; +import org.bdware.server.executor.unconsistency.RequestOnceExecutor; +import org.bdware.server.executor.unconsistency.ResponseOnceExecutor; import org.bdware.server.tcp.TCPClientFrameHandler; -import org.bdware.server.trustedmodel.*; +import org.bdware.server.trustedmodel.ContractExecutor; +import org.bdware.server.trustedmodel.KillUnitContractInfo; +import org.bdware.server.trustedmodel.MasterProxy; +import org.bdware.server.trustedmodel.SingleNodeExecutor; import org.bdware.units.NetworkManager; import org.bdware.units.function.ExecutionManager; import org.zz.gmhelper.SM2KeyPair; @@ -132,7 +139,7 @@ public class MasterClientTCPAction { switch (contract.getType()) { case Sole: LOGGER.info("Sole contract is not supported in multi-point mode"); - return null; + return SingleNodeExecutor.instance; case RequestOnce: executor = new RequestOnceExecutor(contractID); break; @@ -383,6 +390,7 @@ public class MasterClientTCPAction { contractID2MasterInfo.put(contractID, this); // 记录contractID 和 master之间的对应关系 MultiContractMeta cei = CMActions.manager.multiContractRecorder.createIfNotExist(contractID); + cei.setLastExeSeq(-1); if (!contract.getScriptStr().startsWith("/")) { contract.setScript(dumpToDisk(contract, jo)); @@ -433,11 +441,14 @@ public class MasterClientTCPAction { } LOGGER.info("启动参数: " + JsonUtil.toJson(contract)); - cei.setContractExecutor(createContractExecutor(contract, contractID)); // 分配不同的Executor + + String ret = CMActions.manager.startContract(contract); // 调用CMActions 里的启动合约的方法,启动结果 LOGGER.info("启动结果为 " + ret); CMActions.manager.multiContractRecorder.updateValue(cei); + ContractMeta meta = CMActions.manager.statusRecorder.createIfNotExist(contractID); + meta.setContractExecutor(createContractExecutor(contract, contractID)); // 分配不同的Executor // TODO 合约终止后从数据库中移除,但是为了测试可以人为制造合约终止但不从数据库中移除(异常停止) KeyValueDBUtil.instance.setValue(CMTables.UnitContracts.toString(), contractID, "exist"); @@ -536,15 +547,12 @@ public class MasterClientTCPAction { result.onResult(JsonUtil.toJson(ret)); } else { // reqeust all response all need seq String contractID = request.getContractID(); - MultiContractMeta cei = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); boolean putToQueue = false; - // 对于多点合约的请求 if (request.getRequestID().endsWith("_mul")) { // 收到的多点请求不能同时处理 - boolean isFirst = false; synchronized (MultiRequestInfo.lock) { // 加锁,防止同时放入多个同requestID的请求到cei的队列中 if (MultiRequestInfo.reqInfos.containsKey(request.getRequestID())) { diff --git a/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java b/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java index 81a7e86..b6e2556 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java @@ -18,7 +18,7 @@ import org.bdware.server.action.Action; import org.bdware.server.action.CMActions; import org.bdware.server.tcp.TCPServerFrameHandler; import org.bdware.server.trustedmodel.ContractUnitStatus; -import org.bdware.server.trustedmodel.RequestAllExecutor; +import org.bdware.server.executor.RequestAllExecutor; import org.bdware.server.trustedmodel.SlaveNode; import org.bdware.units.NetworkManager; import org.zz.gmhelper.SM2KeyPair; @@ -317,14 +317,14 @@ public class MasterServerRecoverMechAction { } ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID); - cei.setContractExecutor( + meta.setContractExecutor( MasterClientTCPAction.createContractExecutor(meta.contract, contractID)); switch (meta.contract.getType()) { case RequestAllResponseFirst: case RequestAllResponseHalf: case RequestAllResponseAll: case Sharding: - ((RequestAllExecutor) cei.contractExecutor).setSeq(cei.getLastExeSeq() + 1); + ((RequestAllExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1); break; default: break; diff --git a/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java b/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java index 152544e..9107850 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java @@ -17,16 +17,14 @@ import org.bdware.sc.units.RecoverFlag; import org.bdware.sc.units.RequestCache; import org.bdware.sc.util.JsonUtil; import org.bdware.server.CongestionControl; -import org.bdware.server.GlobalConf; import org.bdware.server.action.Action; import org.bdware.server.action.CMActions; import org.bdware.server.action.SyncResult; +import org.bdware.server.executor.RequestAllExecutor; import org.bdware.server.tcp.TCPServerFrameHandler; import org.bdware.server.trustedmodel.KillUnitContractResultCollector; -import org.bdware.server.trustedmodel.RequestAllExecutor; import org.bdware.server.trustedmodel.ResultCollector; import org.bdware.server.trustedmodel.SlaveNode; -import org.zz.gmhelper.SM2KeyPair; import java.text.SimpleDateFormat; import java.util.*; @@ -253,22 +251,23 @@ public class MasterServerTCPAction { MasterServerTCPAction.sync.wakeUp(responseID, jo.toString()); } - @Action(async = true) - // 假设该结点有运行这个合约调用的合约 - public void executeContractLocallyServer(JsonObject jo, ResultCallback cb) { - final ContractRequest request = - JsonUtil.fromJson(jo.get("data").toString(), ContractRequest.class); - long start = System.currentTimeMillis(); - String data2 = CMActions.manager.executeLocally(request, null); - Map ret = new HashMap<>(); - ret.put("action", "receiveTrustfullyResultServer"); - SM2KeyPair keyPair = GlobalConf.instance.keyPair; - ret.put("nodeID", keyPair.getPublicKeyStr()); - ret.put("responseID", jo.get("uniReqID").getAsString()); - ret.put("executeTime", (System.currentTimeMillis() - start) + ""); - ret.put("data", data2); - cb.onResult(JsonUtil.toJson(ret)); - } +// TODO to Remove +// @Action(async = true) +// // 假设该结点有运行这个合约调用的合约 +// public void executeContractLocallyServer(JsonObject jo, ResultCallback cb) { +// final ContractRequest request = +// JsonUtil.fromJson(jo.get("data").toString(), ContractRequest.class); +// long start = System.currentTimeMillis(); +// String data2 = CMActions.manager.executeLocally(request, null); +// Map ret = new HashMap<>(); +// ret.put("action", "receiveTrustfullyResultServer"); +// SM2KeyPair keyPair = GlobalConf.instance.keyPair; +// ret.put("nodeID", keyPair.getPublicKeyStr()); +// ret.put("responseID", jo.get("uniReqID").getAsString()); +// ret.put("executeTime", (System.currentTimeMillis() - start) + ""); +// ret.put("data", data2); +// cb.onResult(JsonUtil.toJson(ret)); +// } @Action(async = true) public void setNodeInfo(JsonObject jo, ResultCallback cb) { @@ -334,8 +333,9 @@ public class MasterServerTCPAction { // 这个是个多节点的合约 // Just forward it to the correct Node // Master节点直接发3个,聚合后返回结果。 - info.contractExecutor.execute( + contractMeta.contractExecutor.execute( requestID, + JsonUtil.fromJson(cr, ContractRequest.class), new ResultCallback() { @Override public void onResult(String str) { @@ -346,8 +346,7 @@ public class MasterServerTCPAction { cb.onResult(JsonUtil.toJson(result)); CongestionControl.masterServerLoad.decrementAndGet(); } - }, - JsonUtil.fromJson(cr, ContractRequest.class)); + }, null); } else { ContractRequest contractRequest = JsonUtil.fromJson(cr, ContractRequest.class); if (contractMeta.getStatus() == HANGED) { @@ -359,20 +358,19 @@ public class MasterServerTCPAction { // 这个是个单节点的合约 // executeContract(CacheTest应该要有多个进来。 if (null != client && !client.getContractType().needSeq()) { - CMActions.manager.executeLocallyAsync( - contractRequest, - new ResultCallback() { - @Override - public void onResult(String str) { - Map result = new HashMap<>(); - result.put("action", "receiveContractExecution"); - result.put("responseID", cr.get("requestID").getAsString()); - result.put("data", str); - cb.onResult(JsonUtil.toJson(result)); - CongestionControl.masterServerLoad.decrementAndGet(); - } - }, - null); + contractMeta.contractExecutor.execute(requestID, contractRequest, new ResultCallback() { + @Override + public void onResult(String str) { + Map result = new HashMap<>(); + result.put("action", "receiveContractExecution"); + result.put("responseID", cr.get("requestID").getAsString()); + result.put("data", str); + cb.onResult(JsonUtil.toJson(result)); + LOGGER.debug("Return requestContractExecution:"+JsonUtil.toJson(result)); + + CongestionControl.masterServerLoad.decrementAndGet(); + } + }, null); } else { LOGGER.debug("send ReRoute response:" + cr.toString()); JsonObject result = new JsonObject(); diff --git a/src/main/java/org/bdware/server/trustedmodel/RequestAllExecutor.java b/src/main/java/org/bdware/server/executor/RequestAllExecutor.java similarity index 94% rename from src/main/java/org/bdware/server/trustedmodel/RequestAllExecutor.java rename to src/main/java/org/bdware/server/executor/RequestAllExecutor.java index 449752b..22faf72 100644 --- a/src/main/java/org/bdware/server/trustedmodel/RequestAllExecutor.java +++ b/src/main/java/org/bdware/server/executor/RequestAllExecutor.java @@ -1,4 +1,4 @@ -package org.bdware.server.trustedmodel; +package org.bdware.server.executor; import com.google.gson.JsonObject; import com.google.gson.JsonParser; @@ -9,7 +9,9 @@ import org.bdware.sc.ComponedContractResult; import org.bdware.sc.ContractResult; import org.bdware.sc.bean.ContractExecType; import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.units.RecoverFlag; import org.bdware.sc.units.RequestCache; import org.bdware.sc.units.ResultCache; @@ -17,6 +19,10 @@ import org.bdware.sc.util.JsonUtil; import org.bdware.server.action.CMActions; import org.bdware.server.action.p2p.MasterServerRecoverMechAction; import org.bdware.server.action.p2p.MasterServerTCPAction; +import org.bdware.server.trustedmodel.ContractExecutor; +import org.bdware.server.trustedmodel.MultiReqSeq; +import org.bdware.server.trustedmodel.ResultCollector; +import org.bdware.server.trustedmodel.SlaveNode; import java.util.HashMap; import java.util.HashSet; @@ -131,9 +137,13 @@ public class RequestAllExecutor implements ContractExecutor { } @Override - public void execute(String requestID, ResultCallback rc, ContractRequest req) { + public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { LOGGER.debug(JsonUtil.toJson(req)); - + MultiContractMeta meta = CMActions.manager.multiContractRecorder.getMultiContractMeta(req.getContractID()); + if (meta == null || !meta.isMaster()) { + CMActions.manager.executeContractOnOtherNodes(req, rc); + return; + } req.setContractID(CMActions.manager.getContractIDByName(req.getContractID())); // 三个相同requestID进来的时候,会有冲突。 @@ -141,6 +151,9 @@ public class RequestAllExecutor implements ContractExecutor { // 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 // 如果是多点合约的请求,A1、A2、A3的序号应该一致,不能分配一个新的seq,根据requestID判断是否不需要重新分配一个序号 + //TODO seqMap memory leak + //TODO + //TODO if (null != requestID && requestID.endsWith("_mul")) { synchronized (lock) { if (seqMap.containsKey(requestID)) { diff --git a/src/main/java/org/bdware/server/trustedmodel/MultiPointCooperationExecutor.java b/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java similarity index 97% rename from src/main/java/org/bdware/server/trustedmodel/MultiPointCooperationExecutor.java rename to src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java index fbbb12b..fe08596 100644 --- a/src/main/java/org/bdware/server/trustedmodel/MultiPointCooperationExecutor.java +++ b/src/main/java/org/bdware/server/executor/unconsistency/MultiPointCooperationExecutor.java @@ -1,4 +1,4 @@ -package org.bdware.server.trustedmodel; +package org.bdware.server.executor.unconsistency; import com.google.gson.JsonObject; import com.google.gson.JsonParser; @@ -12,6 +12,7 @@ import org.bdware.sc.bean.ContractExecType; import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.bean.FunctionDesp; import org.bdware.sc.bean.RouteInfo; +import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.units.RecoverFlag; @@ -21,6 +22,10 @@ import org.bdware.sc.util.JsonUtil; import org.bdware.server.action.CMActions; import org.bdware.server.action.p2p.MasterServerRecoverMechAction; import org.bdware.server.action.p2p.MasterServerTCPAction; +import org.bdware.server.trustedmodel.ContractExecutor; +import org.bdware.server.trustedmodel.MultiReqSeq; +import org.bdware.server.trustedmodel.ResultCollector; +import org.bdware.server.trustedmodel.SlaveNode; import java.math.BigInteger; import java.util.HashMap; @@ -195,7 +200,7 @@ public class MultiPointCooperationExecutor implements ContractExecutor { } @Override - public void execute(String requestID, ResultCallback rc, ContractRequest req) { + public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req)); // 获得action 函数名 LOGGER.info("action is : " + req.getAction()); diff --git a/src/main/java/org/bdware/server/trustedmodel/RequestOnceExecutor.java b/src/main/java/org/bdware/server/executor/unconsistency/RequestOnceExecutor.java similarity index 89% rename from src/main/java/org/bdware/server/trustedmodel/RequestOnceExecutor.java rename to src/main/java/org/bdware/server/executor/unconsistency/RequestOnceExecutor.java index df0edf5..9bcc541 100644 --- a/src/main/java/org/bdware/server/trustedmodel/RequestOnceExecutor.java +++ b/src/main/java/org/bdware/server/executor/unconsistency/RequestOnceExecutor.java @@ -1,14 +1,17 @@ -package org.bdware.server.trustedmodel; +package org.bdware.server.executor.unconsistency; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.util.JsonUtil; import org.bdware.server.action.CMActions; import org.bdware.server.action.p2p.MasterServerTCPAction; +import org.bdware.server.trustedmodel.ContractExecutor; +import org.bdware.server.trustedmodel.SlaveNode; import java.util.HashMap; import java.util.Map; @@ -22,10 +25,8 @@ public class RequestOnceExecutor implements ContractExecutor { public RequestOnceExecutor(String contractID) { this.contractID = contractID; } - @Override - public void execute(String requestID, ResultCallback rc, ContractRequest req) { - // String contractID = req.getContractID(); + public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { ResultCallback cb = new ResultCallback() { @Override diff --git a/src/main/java/org/bdware/server/trustedmodel/ResponseOnceExecutor.java b/src/main/java/org/bdware/server/executor/unconsistency/ResponseOnceExecutor.java similarity index 92% rename from src/main/java/org/bdware/server/trustedmodel/ResponseOnceExecutor.java rename to src/main/java/org/bdware/server/executor/unconsistency/ResponseOnceExecutor.java index 9ca8a10..ea16df3 100644 --- a/src/main/java/org/bdware/server/trustedmodel/ResponseOnceExecutor.java +++ b/src/main/java/org/bdware/server/executor/unconsistency/ResponseOnceExecutor.java @@ -1,4 +1,4 @@ -package org.bdware.server.trustedmodel; +package org.bdware.server.executor.unconsistency; import com.google.gson.JsonObject; import com.google.gson.JsonParser; @@ -6,10 +6,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bdware.sc.ContractResult; import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.util.JsonUtil; import org.bdware.server.action.CMActions; import org.bdware.server.action.p2p.MasterServerTCPAction; +import org.bdware.server.trustedmodel.ContractExecutor; +import org.bdware.server.trustedmodel.SlaveNode; import java.util.HashMap; import java.util.Map; @@ -25,7 +28,7 @@ public class ResponseOnceExecutor implements ContractExecutor { } @Override - public void execute(String requestID, ResultCallback rc, ContractRequest req) { + public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { executeInternal(requestID, rc, req, 2); } diff --git a/src/main/java/org/bdware/server/trustedmodel/RouteEnabledExecutor.java b/src/main/java/org/bdware/server/executor/unconsistency/_UNUSED_RouteEnabledExecutor.java similarity index 88% rename from src/main/java/org/bdware/server/trustedmodel/RouteEnabledExecutor.java rename to src/main/java/org/bdware/server/executor/unconsistency/_UNUSED_RouteEnabledExecutor.java index e782f19..640c0d2 100644 --- a/src/main/java/org/bdware/server/trustedmodel/RouteEnabledExecutor.java +++ b/src/main/java/org/bdware/server/executor/unconsistency/_UNUSED_RouteEnabledExecutor.java @@ -1,4 +1,4 @@ -package org.bdware.server.trustedmodel; +package org.bdware.server.executor.unconsistency; import com.google.gson.JsonObject; import com.google.gson.JsonParser; @@ -9,28 +9,31 @@ import org.bdware.sc.ContractMeta; import org.bdware.sc.bean.ContractRequest; import org.bdware.sc.bean.FunctionDesp; import org.bdware.sc.bean.RouteInfo; +import org.bdware.sc.conn.OnHashCallback; import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.units.MultiContractMeta; import org.bdware.sc.util.JsonUtil; import org.bdware.server.action.CMActions; import org.bdware.server.action.p2p.MasterServerTCPAction; +import org.bdware.server.trustedmodel.ContractExecutor; +import org.bdware.server.trustedmodel.SlaveNode; import java.math.BigInteger; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -public class RouteEnabledExecutor implements ContractExecutor { - private static final Logger LOGGER = LogManager.getLogger(RouteEnabledExecutor.class); +public class _UNUSED_RouteEnabledExecutor implements ContractExecutor { + private static final Logger LOGGER = LogManager.getLogger(_UNUSED_RouteEnabledExecutor.class); private final String contractID; AtomicInteger order = new AtomicInteger(0); - public RouteEnabledExecutor(String contractID) { + public _UNUSED_RouteEnabledExecutor(String contractID) { this.contractID = contractID; } @Override - public void execute(String requestID, ResultCallback rc, ContractRequest req) { + public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hashCallback) { executeInternal(requestID, rc, req, 2); } diff --git a/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java b/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java index 3a9bd6e..fbc0205 100644 --- a/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java +++ b/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java @@ -112,7 +112,7 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler { ByteBuf bb = (ByteBuf) frame; JsonObject arg; try { - arg = JsonUtil.parseReaderAsJsonObject(new InputStreamReader(new ByteBufInputStream(bb))); + arg = JsonUtil.parseObject(new InputStreamReader(new ByteBufInputStream(bb))); } catch (Exception e) { e.printStackTrace(); Response response = new Response(); @@ -183,7 +183,7 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler { JsonObject jo = JsonUtil.parseStringAsJsonObject(json); } catch (Exception e) { e.printStackTrace(); - LOGGER.warn("JsonParse Error: " + e.getMessage() + "\n\t" + json); + System.out.println("============[MasterClientFrameHandler]JsonParse Error:" + json); } ByteBuf buf = Unpooled.wrappedBuffer(json.getBytes()); ctx.channel().writeAndFlush(buf); diff --git a/src/main/java/org/bdware/server/trustedmodel/MasterProxy.java b/src/main/java/org/bdware/server/trustedmodel/MasterProxy.java index 64b6ffe..d0a15fa 100644 --- a/src/main/java/org/bdware/server/trustedmodel/MasterProxy.java +++ b/src/main/java/org/bdware/server/trustedmodel/MasterProxy.java @@ -57,42 +57,6 @@ public class MasterProxy implements MasterStub { } } - @Override - public void executeByMaster( - ContractClient client, ResultCallback rcb, ContractRequest request) { - assert client.isUnit(); - // ********** hyy ********** // - // 修改这个地方的执行逻辑,判断路由 - LOGGER.debug(client.getContractType());//sharding - - // ********** hyy ********** // - // assert client.isMaster(); - LOGGER.debug("test location"); - // TODO 这儿有问题。 - // 进来了三个相同的ID的东西。 - MultiContractMeta contractInfo = - CMActions.manager.multiContractRecorder.getMultiContractMeta(client.getContractID()); - long start = System.currentTimeMillis(); - contractInfo.contractExecutor.execute( - request.getRequestID(), - new ResultCallback() { - @Override - public void onResult(String ret) { - JsonObject result = JsonUtil.parseStringAsJsonObject(ret); - ContractManager.instance.extractEventsFromContractResult( - null, result, client, request, start); - LOGGER.debug( - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") - .format(new Date(System.currentTimeMillis())) - + " [MasterProxy] executeByMaster 结果是 " - + ret - + "\n"); - rcb.onResult(result); - } - }, - request); - } - @Override public void executeByOtherNodeAsync(String pubKey, ContractRequest c, ResultCallback cb) { MasterConnector handler = CONNECTORS.get(pubKey); diff --git a/src/main/java/org/bdware/units/NetworkManager.java b/src/main/java/org/bdware/units/NetworkManager.java index dd5c2d7..ed1d3a0 100644 --- a/src/main/java/org/bdware/units/NetworkManager.java +++ b/src/main/java/org/bdware/units/NetworkManager.java @@ -185,6 +185,9 @@ public class NetworkManager { public NodeCenterClientHandler getNodeCenterClientHandler() { return nodeCenterClientHandler; } + public void sendToNodeCenter(String msg) { + nodeCenterClientHandler.sendMsg(msg); + } /** * send to all kinds including special receivers @@ -204,9 +207,6 @@ public class NetworkManager { send(unitMessage); } - public void sendToNodeCenter(String msg) { - nodeCenterClientHandler.sendMsg(msg); - } /** * send to TCP nodes, if fail send by p2p diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties deleted file mode 100644 index 054c705..0000000 --- a/src/main/resources/log4j.properties +++ /dev/null @@ -1,7 +0,0 @@ -### 设置### -log4j.rootLogger=info,stdout -### 输出信息到控制台 ### -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.Target=System.out -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=[%-5p] %d{HH:mm:ss.SSS} %m (%F:%L)[%M]%n \ No newline at end of file diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties index 3c01bc8..f3c8a11 100644 --- a/src/main/resources/log4j2.properties +++ b/src/main/resources/log4j2.properties @@ -10,6 +10,6 @@ appender.rolling.append=true appender.rolling.fileName=./log/cm.log appender.rolling.layout.type=PatternLayout appender.rolling.layout.pattern=%d-%m%n -rootLogger.level=info +rootLogger.level=debug rootLogger.appenderRef.stdout.ref=STDOUT rootLogger.appenderRef.log.ref=log \ No newline at end of file