From 969e45668761bc025a234239f27b42af11668a29 Mon Sep 17 00:00:00 2001 From: CaiHQ Date: Mon, 22 Nov 2021 11:50:39 +0800 Subject: [PATCH] merge pbft algorithm --- .../org/bdware/server/action/CMActions.java | 2 +- .../bdware/server/action/MasterWSAction.java | 8 +- .../action/p2p/AliveCheckClientAction.java | 3 +- .../action/p2p/AliveCheckServerAction.java | 12 +- .../p2p/MasterClientRecoverMechAction.java | 40 +- .../action/p2p/MasterClientTCPAction.java | 28 +- .../p2p/MasterServerRecoverMechAction.java | 11 +- .../action/p2p/MasterServerTCPAction.java | 2 +- .../executor/consistency/PBFTExecutor.java | 371 ++++++++++++++++++ .../{ => consistency}/RequestAllExecutor.java | 2 +- .../server/tcp/PubkeyResultCallback.java | 22 ++ .../server/tcp/TCPClientFrameHandler.java | 18 +- .../server/tcp/TCPServerFrameHandler.java | 27 +- .../server/trustedmodel/ContractCluster.java | 35 ++ .../java/org/bdware/units/NetworkManager.java | 37 +- 15 files changed, 570 insertions(+), 48 deletions(-) create mode 100644 src/main/java/org/bdware/server/executor/consistency/PBFTExecutor.java rename src/main/java/org/bdware/server/executor/{ => consistency}/RequestAllExecutor.java (99%) create mode 100644 src/main/java/org/bdware/server/tcp/PubkeyResultCallback.java create mode 100644 src/main/java/org/bdware/server/trustedmodel/ContractCluster.java diff --git a/src/main/java/org/bdware/server/action/CMActions.java b/src/main/java/org/bdware/server/action/CMActions.java index b5d8f66..c48a455 100644 --- a/src/main/java/org/bdware/server/action/CMActions.java +++ b/src/main/java/org/bdware/server/action/CMActions.java @@ -1219,7 +1219,7 @@ public class CMActions implements OnHashCallback { return; } - if (cl.isUnit()) { + if (cl.contractMeta.contract.getType().isUnit()) { LOGGER.debug("killContractProcess : 集群合约 kill"); killContractByMaster(rc.getContractID(), args, resultCallback); diff --git a/src/main/java/org/bdware/server/action/MasterWSAction.java b/src/main/java/org/bdware/server/action/MasterWSAction.java index 2545f9c..5a053ed 100644 --- a/src/main/java/org/bdware/server/action/MasterWSAction.java +++ b/src/main/java/org/bdware/server/action/MasterWSAction.java @@ -25,7 +25,10 @@ import org.bdware.units.NetworkManager; import org.zz.gmhelper.SM2KeyPair; import org.zz.gmhelper.SM2Util; -import java.util.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -41,6 +44,7 @@ public class MasterWSAction { public static boolean hostMaster(String contractID) { return CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).isMaster(); } + private boolean waitForConnection(Set nodeNames) { LOGGER.info("waitForAllNodes:" + JsonUtil.toJson(nodeNames)); for (int i = 0; i < 5; i++) { @@ -66,6 +70,7 @@ public class MasterWSAction { LOGGER.info("waitForAllNodes return false;"); return false; } + @Action(async = true, userPermission = 1L << 26) // zyx modified public void startContractMultiPoint(JsonObject args, final ResultCallback rc) { @@ -262,6 +267,7 @@ public class MasterWSAction { case RequestAllResponseAll: case Sharding: case SelfAdaptiveSharding: + case PBFT: contract.setNumOfCopies(nodeSize); break; default: diff --git a/src/main/java/org/bdware/server/action/p2p/AliveCheckClientAction.java b/src/main/java/org/bdware/server/action/p2p/AliveCheckClientAction.java index 0f7efb4..aeb5adc 100644 --- a/src/main/java/org/bdware/server/action/p2p/AliveCheckClientAction.java +++ b/src/main/java/org/bdware/server/action/p2p/AliveCheckClientAction.java @@ -54,7 +54,6 @@ public class AliveCheckClientAction { HeartBeatUtil.getInstance().cancel(sendPingTask); sendPingTask = null; } - NetworkManager.instance.closeAgent(masterPubkey); } public void checkMasterAlive(ResultCallback rc) { @@ -125,7 +124,7 @@ public class AliveCheckClientAction { System.currentTimeMillis(); } } - closeMaster(); + NetworkManager.instance.closeAgent(masterPubkey); } // if } }; diff --git a/src/main/java/org/bdware/server/action/p2p/AliveCheckServerAction.java b/src/main/java/org/bdware/server/action/p2p/AliveCheckServerAction.java index 6c163fd..a87d0cc 100644 --- a/src/main/java/org/bdware/server/action/p2p/AliveCheckServerAction.java +++ b/src/main/java/org/bdware/server/action/p2p/AliveCheckServerAction.java @@ -20,7 +20,7 @@ public class AliveCheckServerAction { TimerTask checkAliveTask; private long lastSlavePingTime; - String pubKey; + public String pubKey; public AliveCheckServerAction(TCPServerFrameHandler handler) { lastSlavePingTime = @@ -38,11 +38,20 @@ public class AliveCheckServerAction { this.handler = handler; } + public void close() { + if (checkAliveTask != null) { + HeartBeatUtil.getInstance().cancel(checkAliveTask); + checkAliveTask = null; + } + } + private class HeartBeatTask extends TimerTask { int delay; + HeartBeatTask(int delay) { this.delay = delay; } + @Override public void run() { try { @@ -80,6 +89,7 @@ public class AliveCheckServerAction { for (String contractID : contracts) { MasterServerRecoverMechAction.unitModeCheck(contractID); } + NetworkManager.instance.closeAgent(pubKey); } } catch (Exception e) { e.printStackTrace(); diff --git a/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java b/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java index f3151e4..3b0f011 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterClientRecoverMechAction.java @@ -5,6 +5,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bdware.sc.ContractClient; import org.bdware.sc.ContractManager; +import org.bdware.sc.ContractMeta; import org.bdware.sc.RecoverMechTimeRecorder; import org.bdware.sc.bean.Contract; import org.bdware.sc.conn.ByteUtil; @@ -18,6 +19,8 @@ import org.bdware.server.GlobalConf; import org.bdware.server.action.Action; import org.bdware.server.action.CMActions; import org.bdware.server.action.RequestToMaster; +import org.bdware.server.executor.consistency.PBFTExecutor; +import org.bdware.server.executor.consistency.RequestAllExecutor; import org.bdware.server.tcp.TCPClientFrameHandler; import org.bdware.server.trustedmodel.AgentManager; import org.bdware.server.trustedmodel.ContractUnitStatus; @@ -324,6 +327,7 @@ public class MasterClientRecoverMechAction { } LOGGER.info("恢复步骤-----------5 检查是否有合约进程"); + // cei.printContent(); ContractClient client = CMActions.manager.getClient(cei.getContractID()); @@ -338,7 +342,23 @@ public class MasterClientRecoverMechAction { + client.getContractType() + "\n"); } - + ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(cei.getContractID()); + meta.setContractExecutor( + MasterClientTCPAction.createContractExecutor(meta.contract, contractID, cei.getMasterNode(), cei.getMembers())); + switch (meta.contract.getType()) { + case RequestAllResponseFirst: + case RequestAllResponseHalf: + case RequestAllResponseAll: + ((RequestAllExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1); + break; + case PBFT: + ((PBFTExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1); + break; + case Sharding: + break; + default: + break; + } // 认为contractID不会重 if (client != null && client.isProcessAlive() @@ -599,7 +619,23 @@ public class MasterClientRecoverMechAction { cei.getContractID())); } CMActions.manager.multiContractRecorder.updateValue(cei); + ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID); + switch (meta.contract.getType()) { + case RequestAllResponseFirst: + case RequestAllResponseHalf: + case RequestAllResponseAll: + ((RequestAllExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1); + break; + case PBFT: + ((PBFTExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1); + break; + case Sharding: + break; + default: + break; + } boolean flag = checkAndRestart(cei); // if need,restart the contract process + if (!flag) { return; } @@ -638,6 +674,8 @@ public class MasterClientRecoverMechAction { cei.setLastExeSeq(lastExeSeq); // 通过数据库中值设置lastExeSeq,和本地记录的ContractRecord是同步的 LOGGER.info("本地节点恢复完成之后 lastExeSeq = " + cei.getLastExeSeq()); } + + // String m1 = CMActions.manager.dumpContract(contractID, ""); // System.out.println("从本地transRecods恢复之后状态为 \n" + m1); diff --git a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java index 174e4b9..b2c8bcb 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterClientTCPAction.java @@ -9,17 +9,21 @@ import org.bdware.sc.*; import org.bdware.sc.bean.Contract; import org.bdware.sc.bean.ContractExecType; import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.conn.ByteUtil; import org.bdware.sc.conn.ResultCallback; import org.bdware.sc.db.CMTables; import org.bdware.sc.db.KeyValueDBUtil; import org.bdware.sc.units.MultiContractMeta; +import org.bdware.sc.units.PubKeyNode; import org.bdware.sc.util.JsonUtil; import org.bdware.server.GlobalConf; import org.bdware.server.action.*; -import org.bdware.server.executor.RequestAllExecutor; +import org.bdware.server.executor.consistency.PBFTExecutor; +import org.bdware.server.executor.consistency.RequestAllExecutor; import org.bdware.server.executor.unconsistency.MultiPointCooperationExecutor; import org.bdware.server.executor.unconsistency.RequestOnceExecutor; import org.bdware.server.executor.unconsistency.ResponseOnceExecutor; +import org.bdware.server.tcp.PubkeyResultCallback; import org.bdware.server.trustedmodel.ContractExecutor; import org.bdware.server.trustedmodel.KillUnitContractInfo; import org.bdware.server.trustedmodel.SelfAdaptiveShardingExecutor; @@ -82,7 +86,7 @@ public class MasterClientTCPAction { LOGGER.info("认为合约 " + contractID + " 的master崩溃 当前master为null 向NC发送重选信息"); } - public static ContractExecutor createContractExecutor(Contract contract, String contractID) { + public static ContractExecutor createContractExecutor(Contract contract, String contractID, String masterPubkey, String[] members) { ContractExecutor executor = null; int nodeSize = contract.getNumOfCopies(); switch (contract.getType()) { @@ -118,6 +122,9 @@ public class MasterClientTCPAction { case SelfAdaptiveSharding: executor = new SelfAdaptiveShardingExecutor(contractID); break; + case PBFT: + executor = new PBFTExecutor(nodeSize, contractID, masterPubkey, members); + break; } return executor; } @@ -253,7 +260,8 @@ public class MasterClientTCPAction { LOGGER.info("启动结果为 " + ret); CMActions.manager.multiContractRecorder.updateValue(cei); ContractMeta meta = CMActions.manager.statusRecorder.createIfNotExist(contractID); - meta.setContractExecutor(createContractExecutor(contract, contractID)); // 分配不同的Executor + + meta.setContractExecutor(createContractExecutor(contract, contractID, jo.get("master").getAsString(), cei.getMembers())); // 分配不同的Executor // TODO 合约终止后从数据库中移除,但是为了测试可以人为制造合约终止但不从数据库中移除(异常停止) KeyValueDBUtil.instance.setValue(CMTables.UnitContracts.toString(), contractID, "exist"); @@ -579,6 +587,20 @@ public class MasterClientTCPAction { jsonObject.get("responseID").getAsString(), jsonObject.get("data").getAsString()); } + @Action(async = true) + public void contractSyncMessage(JsonObject jsonObject, ResultCallback resultCallback) { + String contractID = jsonObject.get("contractID").getAsString(); + MultiContractMeta mcm = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); + ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID); + byte[] data = ByteUtil.decodeBASE64(jsonObject.get("data").getAsString()); + PubKeyNode node = new PubKeyNode(); + if (resultCallback instanceof PubkeyResultCallback) { + PubkeyResultCallback p = (PubkeyResultCallback) resultCallback; + node.pubkey = p.getPubkey(); + } + meta.contractExecutor.onSyncMessage(node, data); + } + @Action(async = true) public void reRouteContract(JsonObject jo, ResultCallback result) { LOGGER.info("Receive Reroute Info:" + jo.toString()); diff --git a/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java b/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java index 38f6025..fdbdb2e 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterServerRecoverMechAction.java @@ -16,7 +16,8 @@ import org.bdware.sc.util.JsonUtil; import org.bdware.server.GlobalConf; import org.bdware.server.action.Action; import org.bdware.server.action.CMActions; -import org.bdware.server.executor.RequestAllExecutor; +import org.bdware.server.executor.consistency.PBFTExecutor; +import org.bdware.server.executor.consistency.RequestAllExecutor; import org.bdware.server.trustedmodel.ContractUnitStatus; import org.bdware.units.NetworkManager; import org.zz.gmhelper.SM2KeyPair; @@ -294,14 +295,18 @@ public class MasterServerRecoverMechAction { ContractMeta meta = CMActions.manager.statusRecorder.getContractMeta(contractID); meta.setContractExecutor( - MasterClientTCPAction.createContractExecutor(meta.contract, contractID)); + MasterClientTCPAction.createContractExecutor(meta.contract, contractID, cei.getMasterNode(), cei.getMembers())); switch (meta.contract.getType()) { case RequestAllResponseFirst: case RequestAllResponseHalf: case RequestAllResponseAll: - case Sharding: ((RequestAllExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1); break; + case PBFT: + ((PBFTExecutor) meta.contractExecutor).setSeq(cei.getLastExeSeq() + 1); + break; + case Sharding: + break; default: break; } diff --git a/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java b/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java index 890c1ea..9afa5cd 100644 --- a/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java +++ b/src/main/java/org/bdware/server/action/p2p/MasterServerTCPAction.java @@ -18,7 +18,7 @@ import org.bdware.server.CongestionControl; import org.bdware.server.action.Action; import org.bdware.server.action.CMActions; import org.bdware.server.action.SyncResult; -import org.bdware.server.executor.RequestAllExecutor; +import org.bdware.server.executor.consistency.RequestAllExecutor; import org.bdware.server.trustedmodel.KillUnitContractResultCollector; import org.bdware.server.trustedmodel.ResultCollector; import org.bdware.units.NetworkManager; diff --git a/src/main/java/org/bdware/server/executor/consistency/PBFTExecutor.java b/src/main/java/org/bdware/server/executor/consistency/PBFTExecutor.java new file mode 100644 index 0000000..0833b37 --- /dev/null +++ b/src/main/java/org/bdware/server/executor/consistency/PBFTExecutor.java @@ -0,0 +1,371 @@ +package org.bdware.server.executor.consistency; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonPrimitive; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.sc.ComponedContractResult; +import org.bdware.sc.ContractResult; +import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.conn.Node; +import org.bdware.sc.conn.OnHashCallback; +import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.sequencing.*; +import org.bdware.sc.units.*; +import org.bdware.sc.util.JsonUtil; +import org.bdware.server.GlobalConf; +import org.bdware.server.action.CMActions; +import org.bdware.server.action.p2p.MasterServerRecoverMechAction; +import org.bdware.server.action.p2p.MasterServerTCPAction; +import org.bdware.server.trustedmodel.ContractCluster; +import org.bdware.server.trustedmodel.ContractExecutor; +import org.bdware.server.trustedmodel.MultiReqSeq; +import org.bdware.server.trustedmodel.ResultCollector; +import org.bdware.units.NetworkManager; +import org.zz.gmhelper.SM2KeyPair; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +//TODO 追赶差下的调用 +public class PBFTExecutor implements ContractExecutor { + private static final Logger LOGGER = LogManager.getLogger(PBFTExecutor.class); + final Object lock = new Object(); + private final List members; + int resultCount; + + AtomicInteger request_index = new AtomicInteger(0); + // key为requestID,value为其seq + Map seqMap = new ConcurrentHashMap<>(); + Map resultCache = new ConcurrentHashMap<>(); + // MultiPointContractInfo info; + String contractID; + PBFTAlgorithm pbft; + ContractCluster contractCluster; + boolean isMaster; + + public PBFTExecutor( + int c, String con_id, final String masterPubkey, String[] members) { + resultCount = c; + contractID = con_id; + this.members = new ArrayList<>(); + isMaster = GlobalConf.getNodeID().equals(masterPubkey); + pbft = new PBFTAlgorithm(isMaster); + int count = 0; + for (String mem : members) { + PubKeyNode pubkeyNode = new PubKeyNode(); + pubkeyNode.pubkey = mem; + PBFTMember pbftMember = new PBFTMember(); + pbftMember.isMaster = mem.equals(masterPubkey); + pbft.addMember(pubkeyNode, pbftMember); + this.members.add(pubkeyNode); + if (GlobalConf.getNodeID().equals(mem)) { + pbft.setSendID(count); + } + count++; + } + contractCluster = new ContractCluster(contractID, this.members); + pbft.setConnection(contractCluster); + final MultiContractMeta cei = CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID); + pbft.setCommitter(new Committer() { + @Override + public void onCommit(ContractRequest data) { + ResultCallback ret = null; + final long startTime = System.currentTimeMillis(); + ret = new ResultCallback() { + @Override + public void onResult(String str) { + Map ret = new HashMap<>(); + ret.put("action", "receiveTrustfullyResult"); + SM2KeyPair keyPair = GlobalConf.instance.keyPair; + ret.put("nodeID", keyPair.getPublicKeyStr()); + ret.put("responseID", data.getRequestID()); + ret.put("executeTime", (System.currentTimeMillis() - startTime) + ""); + ret.put("data", str); + cei.setLastExeSeq(data.seq); + NetworkManager.instance.sendToAgent(masterPubkey, JsonUtil.toJson(ret)); + } + }; + CMActions.manager.executeLocallyAsync(data, ret, null); + } + }); + } + + public void onSyncMessage(Node node, byte[] data) { + + pbft.onMessage(node, data); + } + + public void setSeq(int seq) { + request_index = new AtomicInteger(seq); + pbft.setAtomSeq(request_index.get()); + } + + public ResultCallback createResultCallback( + final String requestID, + final ResultCallback originalCb, + final int count, + final int request_seq, + final String contractID) { + ComponedContractResult componedContractResult = new ComponedContractResult(count); + // TODO 加对应的超时? + return new ResultCollector( + requestID, new ResultMerger(originalCb, count, request_seq, contractID), count); + } + + public void sendRequest(String id, ContractRequest req, ResultCallback collector) { +// Map reqStr = new HashMap<>(); +// reqStr.put("uniReqID", id); +// reqStr.put("data", req); +// reqStr.put("action", "executeContractLocally"); + ContractRequest cr2 = ContractRequest.parse(req.toByte()); + cr2.setRequestID(id); + PBFTMessage request = new PBFTMessage(); + request.setOrder(req.seq); + request.setType(PBFTType.Request); + request.setContent(cr2.toByte()); + for (PubKeyNode node : members) { + if (!NetworkManager.instance.hasAgentConnection(node.pubkey)) { + LOGGER.warn("cmNode " + node.pubkey.substring(0, 5) + " is null"); + collector.onResult( + "{\"status\":\"Error\",\"result\":\"node offline\"," + + "\"nodeID\":\"" + + node + + "\"," + + "\"action\":\"onExecuteContractTrustfully\"}"); +// } else if (MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID) +// != RecoverFlag.Fine) { +// collector.onResult( +// "{\"status\":\"Error\",\"result\":\"node recovering\"," +// + "\"nodeID\":\"" +// + node +// + "\"," +// + "\"action\":\"onExecuteContractTrustfully\"}"); +// contractCluster.sendMessage(node, request.getBytes()); + } else { + contractCluster.sendMessage(node, request.getBytes()); + } + } + // master负责缓存请求 + if (!MasterServerTCPAction.requestCache.containsKey(contractID)) { + MasterServerTCPAction.requestCache.put(contractID, new RequestCache()); + } + // TODO 多调多统一个seq的有多个请求,这个需要改 + String[] nodes = + CMActions.manager.multiContractRecorder.getMultiContractMeta(contractID).getMembers(); + LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID()); + LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes)); + + } + + + public boolean checkCurNodeNumValid() { + return true; + } + + @Override + public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { + LOGGER.debug(JsonUtil.toJson(req)); + MultiContractMeta meta = CMActions.manager.multiContractRecorder.getMultiContractMeta(req.getContractID()); + if (meta == null || !meta.isMaster()) { + CMActions.manager.executeContractOnOtherNodes(req, rc); + return; + } + req.setContractID(CMActions.manager.getContractIDByName(req.getContractID())); + + // 三个相同requestID进来的时候,会有冲突。 + // 仅在此处有冲突么? + // 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 + + // 如果是多点合约的请求,A1、A2、A3的序号应该一致,不能分配一个新的seq,根据requestID判断是否不需要重新分配一个序号 + //TODO seqMap memory leak + //TODO + //TODO + if (null != requestID && requestID.endsWith("_mul")) { + synchronized (lock) { + if (seqMap.containsKey(requestID)) { + req.seq = seqMap.get(requestID).seq; + } else { + req.seq = request_index.getAndIncrement(); + seqMap.put(requestID, new MultiReqSeq(req.seq)); + } + } + } else { + req.seq = request_index.getAndIncrement(); + } + req.needSeq = true; + String id = + System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq; + LOGGER.info("execute receive requestID=" + requestID + " msgID=" + id); + + if (checkCurNodeNumValid()) { + LOGGER.debug("checkCurNodeNumValid=true"); + ResultCallback collector = + createResultCallback(id, rc, resultCount, req.seq, req.getContractID()); + MasterServerTCPAction.sync.sleep(id, collector); + LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); + sendRequest(id, req, collector); + } else { + LOGGER.debug("invalidNodeNumOnResult"); + request_index.getAndDecrement(); + ContractResult finalResult = + new ContractResult( + ContractResult.Status.Error, + new JsonPrimitive("node number unavailable, request refused.")); + rc.onResult(JsonUtil.toJson(finalResult)); + } + + // } + + /* // 三个相同requestID进来的时候,会有冲突。 + // 仅在此处有冲突么? + // 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 + req.seq = request_index.getAndIncrement(); + req.needSeq = true; + ResultCallback collector = createResultCallback(id, rc, resultCount, req.getContractID()); + MasterServerTCPAction.sync.sleep(id, collector); + sendRequest(id, req, collector);*/ + } + + // 清理缓存的多点合约请求序号 + public void clearCache() { + final long time = System.currentTimeMillis() - 30000L; + seqMap.entrySet() + .removeIf( + entry -> { + MultiReqSeq cache = entry.getValue(); + if (null == cache) { + return true; + } + return cache.startTime < time; + }); + } + + public static class ResultMerger extends ResultCallback { + ComponedContractResult componedContractResult; + AtomicInteger order; + String contractID; + int count; + int request_seq; + ResultCallback originalCallback; + Set nodeIDs = new HashSet<>(); // 已收到返回结果的节点 + + ResultMerger( + final ResultCallback originalCb, + final int count, + final int request_seq, + final String contractID) { + originalCallback = originalCb; + this.count = count; + this.request_seq = request_seq; + this.contractID = contractID; + componedContractResult = new ComponedContractResult(count); + order = new AtomicInteger(0); + } + + public String getContractID() { + return contractID; + } + + public String getInfo() { + return "contractID=" + + contractID + + " 收到第 " + + order + + " 个节点回复 : " + + " order=" + + order + + " count=" + + count + + " "; + } + + @Override + public void onResult(String str) { + // TODO 必须在这里聚合。 + // str的data是个ContractResult + // 在这儿也是返回个ContractResult + try { + LOGGER.debug("a result of contract" + contractID + ": " + str); + JsonObject obj = JsonParser.parseString(str).getAsJsonObject(); + if (obj.has("nodeID")) { + String id = obj.get("nodeID").getAsString(); + if (nodeIDs.contains(id)) { + LOGGER.debug( + "ignored result because the result of node " + + id.substring(0, 5) + + " has been received"); + return; + } + nodeIDs.add(id); + } + + LOGGER.debug( + String.format( + "contractID=%s received=%s order=%d count=%d", + contractID, str, order.get(), count)); + componedContractResult.add(obj); + // 收集到所有结果 + if (order.incrementAndGet() == count) { + ContractResult finalResult = componedContractResult.figureFinalResult(); + finalResult.needSeq = true; + finalResult.seq = request_seq; + + // if (null == finalResult) { + // finalResult = + // new ContractResult( + // ContractResult.Status.Exception, + // new JsonPrimitive( + // "no nore than half of the + // consistent result")); + // originalCallback.onResult(new + // Gson().toJson(finalResult)); + // } else { + originalCallback.onResult(JsonUtil.toJson(finalResult)); + // } + LOGGER.debug( + String.format( + "%d results are the same: %s", + finalResult.size, finalResult.result)); + + // 集群中事务序号+1 + CMActions.manager.multiContractRecorder + .getMultiContractMeta(contractID) + .nextSeqAtMaster(); + + // recover,其中无状态合约CP出错无需恢复 + Set nodesID = componedContractResult.getProblemNodes(); + if (null == nodesID || nodesID.isEmpty()) { + return; + } + for (String nodeID : nodesID) { + LOGGER.warn("node fails! " + nodeID); + if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID) + == RecoverFlag.Fine) { + MasterServerRecoverMechAction.recoverStatus + .get(nodeID) + .put(contractID, RecoverFlag.ToRecover); + } + } + for (String nodeID : nodesID) { + if (MasterServerRecoverMechAction.recoverStatus.get(nodeID).get(contractID) + == RecoverFlag.ToRecover) { + LOGGER.warn("node in recover " + nodeID); + + // 因为该节点结果有误,所以即时是stableMode也认为trans记录不可信 + // 直接通过load别的节点来恢复 + MasterServerRecoverMechAction.restartContractFromCommonMode( + nodeID, contractID); + } + } + } + // clearCache(); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn("result exception!"); + } + } + } +} diff --git a/src/main/java/org/bdware/server/executor/RequestAllExecutor.java b/src/main/java/org/bdware/server/executor/consistency/RequestAllExecutor.java similarity index 99% rename from src/main/java/org/bdware/server/executor/RequestAllExecutor.java rename to src/main/java/org/bdware/server/executor/consistency/RequestAllExecutor.java index ca6bd88..ab217c6 100644 --- a/src/main/java/org/bdware/server/executor/RequestAllExecutor.java +++ b/src/main/java/org/bdware/server/executor/consistency/RequestAllExecutor.java @@ -1,4 +1,4 @@ -package org.bdware.server.executor; +package org.bdware.server.executor.consistency; import com.google.gson.JsonObject; import com.google.gson.JsonParser; diff --git a/src/main/java/org/bdware/server/tcp/PubkeyResultCallback.java b/src/main/java/org/bdware/server/tcp/PubkeyResultCallback.java new file mode 100644 index 0000000..b360d75 --- /dev/null +++ b/src/main/java/org/bdware/server/tcp/PubkeyResultCallback.java @@ -0,0 +1,22 @@ +package org.bdware.server.tcp; + +import org.bdware.sc.conn.ResultCallback; + +public class PubkeyResultCallback extends ResultCallback { + String pubkey; + ResultCallback rc; + + PubkeyResultCallback(String pubkey, ResultCallback rc) { + this.pubkey = pubkey; + this.rc = rc; + } + + public String getPubkey() { + return pubkey; + } + + @Override + public void onResult(String str) { + rc.onResult(str); + } +} \ No newline at end of file diff --git a/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java b/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java index a05bb13..97e579b 100644 --- a/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java +++ b/src/main/java/org/bdware/server/tcp/TCPClientFrameHandler.java @@ -72,6 +72,7 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler { public void close() { try { + aliveCheckClientAction.closeMaster(); if (channel != null) channel.close(); } catch (Exception e) { e.printStackTrace(); @@ -111,17 +112,18 @@ public class TCPClientFrameHandler extends SimpleChannelInboundHandler { } Response response; try { - LOGGER.info("====== TCPClientFrameHandler:" + arg.toString()); + //LOGGER.info("====== TCPClientFrameHandler:" + arg.toString()); final String action = arg.get("action").getAsString(); + PubkeyResultCallback pc = new PubkeyResultCallback(master, new ResultCallback() { + @Override + public void onResult(String ret) { + sendMsg(ret); + } + }); ae.handle( action, - arg, - new ResultCallback() { - @Override - public void onResult(String ret) { - sendMsg(ret); - } - }); + arg, pc + ); } catch (IllegalArgumentException e) { response = new Response(); response.action = "onException"; diff --git a/src/main/java/org/bdware/server/tcp/TCPServerFrameHandler.java b/src/main/java/org/bdware/server/tcp/TCPServerFrameHandler.java index e0da824..ea0114b 100644 --- a/src/main/java/org/bdware/server/tcp/TCPServerFrameHandler.java +++ b/src/main/java/org/bdware/server/tcp/TCPServerFrameHandler.java @@ -25,14 +25,17 @@ import java.util.concurrent.Executors; public class TCPServerFrameHandler extends SimpleChannelInboundHandler { private static final Logger LOGGER = LogManager.getLogger(TCPServerFrameHandler.class); static ExecutorService executorService = Executors.newFixedThreadPool(10); + private final AliveCheckServerAction checkAction; public ChannelHandlerContext ctx; public ActionExecutor ae; + public TCPServerFrameHandler() { + checkAction = new AliveCheckServerAction(this); ae = new ActionExecutor( - executorService, new AliveCheckServerAction(this), + executorService, checkAction, new MasterClientTCPAction(), new MasterClientRecoverMechAction(), MasterClientTransferAction.instance, - new MasterServerRecoverMechAction(), new MasterServerTransferAction(), new MasterServerTCPAction() ) { + new MasterServerRecoverMechAction(), new MasterServerTransferAction(), new MasterServerTCPAction()) { @Override public boolean checkPermission( Action a, final JsonObject args, long permission) { @@ -108,15 +111,17 @@ public class TCPServerFrameHandler extends SimpleChannelInboundHandler { try { final String action = arg.get("action").getAsString(); + + PubkeyResultCallback pubkeyResultCallback = new PubkeyResultCallback(checkAction.pubKey, new ResultCallback() { + @Override + public void onResult(String ret) { + sendMsg(ret); + } + }); ae.handle( action, - arg, - new ResultCallback() { - @Override - public void onResult(String ret) { - sendMsg(ret); - } - }); + arg, pubkeyResultCallback + ); } catch (IllegalArgumentException e) { response = new Response(); @@ -161,6 +166,10 @@ public class TCPServerFrameHandler extends SimpleChannelInboundHandler { } } + public void close() { + checkAction.close(); + } + static class Response { public String cid; diff --git a/src/main/java/org/bdware/server/trustedmodel/ContractCluster.java b/src/main/java/org/bdware/server/trustedmodel/ContractCluster.java new file mode 100644 index 0000000..d1294d0 --- /dev/null +++ b/src/main/java/org/bdware/server/trustedmodel/ContractCluster.java @@ -0,0 +1,35 @@ +package org.bdware.server.trustedmodel; + +import com.google.gson.JsonObject; +import org.bdware.sc.conn.ByteUtil; +import org.bdware.sc.units.PubKeyNode; +import org.bdware.sc.units.TrustfulExecutorConnection; +import org.bdware.units.NetworkManager; + +import java.util.ArrayList; +import java.util.List; + +public class ContractCluster implements TrustfulExecutorConnection { + private final List members; + String contractID; + + public ContractCluster(String contractID, List members) { + this.members = new ArrayList<>(); + this.members.addAll(members); + this.contractID = contractID; + } + + @Override + public void sendMessage(PubKeyNode node, byte[] msg) { + JsonObject jo = new JsonObject(); + jo.addProperty("action", "contractSyncMessage"); + jo.addProperty("contractID", contractID); + jo.addProperty("data", ByteUtil.encodeBASE64(msg)); + NetworkManager.instance.sendToAgent(node.pubkey, jo.toString()); + } + + @Override + public List getNodes() { + return members; + } +} diff --git a/src/main/java/org/bdware/units/NetworkManager.java b/src/main/java/org/bdware/units/NetworkManager.java index b3e22d5..1b65b7a 100644 --- a/src/main/java/org/bdware/units/NetworkManager.java +++ b/src/main/java/org/bdware/units/NetworkManager.java @@ -189,21 +189,20 @@ public class NetworkManager { } public void registerConnection(String nodeID, TCPServerFrameHandler handler) { - LOGGER.info("nodeID:"+nodeID+" connected!!"); + LOGGER.info("nodeID:" + nodeID + " connected!!"); SERVERCONNECTORS.put(nodeID, handler); } public void closeAgent(String agentPubkey) { - //TODO -// if (handler != null) { -// try { -// handler.close(); -// } catch (Exception e) { -// e.printStackTrace(); -// } -// } - //AliveCheckAction.closeMaster(); - NetworkManager.CONNECTORS.remove(agentPubkey); + if (NetworkManager.SERVERCONNECTORS.containsKey(agentPubkey)) { + NetworkManager.SERVERCONNECTORS.get(agentPubkey).close(); + NetworkManager.SERVERCONNECTORS.remove(agentPubkey); + } + if (NetworkManager.CONNECTORS.containsKey(agentPubkey)) { + NetworkManager.CONNECTORS.get(agentPubkey).handler.close(); + NetworkManager.CONNECTORS.remove(agentPubkey); + } + } public void connectToAgent(String master, String contractID) { @@ -260,13 +259,17 @@ public class NetworkManager { } public void sendToAgent(String pubkey, String content) { - if (sendToAgentByServer(pubkey, content)) { - return; + try { + if (sendToAgentByServer(pubkey, content)) { + return; + } + if (!hasAgentConnection(pubkey)) { + nodeCenterClientHandler.getController().connectToNode(pubkey); + } + CONNECTORS.get(pubkey).handler.sendMsg(content); + } catch (Exception e) { + e.printStackTrace(); } - if (!hasAgentConnection(pubkey)) { - nodeCenterClientHandler.getController().connectToNode(pubkey); - } - CONNECTORS.get(pubkey).handler.sendMsg(content); } private boolean sendToAgentByServer(String pubkey, String content) {