From 90a11455788064856baf035273c8f49706e759cc Mon Sep 17 00:00:00 2001 From: CaiHQ Date: Thu, 17 Feb 2022 17:07:01 +0800 Subject: [PATCH] initial commit --- .gitignore | 26 ++ build.gradle | 25 ++ .../AbstractContextContractExecutor.java | 13 + .../plugin/pbft/ContractCluster.java | 35 ++ .../consistency/plugin/pbft/PBFTExecutor.java | 374 +++++++++++++++++ .../plugin/pbft/PBFTExecutorFactory.java | 22 + .../plugin/ra/RequestAllExecutor.java | 343 +++++++++++++++ .../ra/RequestAllResponseAllFactory.java | 21 + .../ra/RequestAllResponseFirstFactory.java | 21 + .../ra/RequestAllResponseHalfFactory.java | 24 ++ .../plugin/ro/RequestOnceExecutor.java | 63 +++ .../plugin/ro/RequestOnceExecutorFactory.java | 19 + .../plugin/ro/ResponseOnceExecutor.java | 80 ++++ .../ro/ResponseOnceExecutorFactory.java | 19 + .../MultiPointCooperationExecutor.java | 395 ++++++++++++++++++ .../MultiPointCooperationExecutorFactory.java | 21 + .../SelfAdaptiveShardingExecutor.java | 308 ++++++++++++++ .../SelfAdaptiveShardingExecutorFactory.java | 19 + .../single/SingleNodeExecutorFactory.java | 24 ++ 19 files changed, 1852 insertions(+) create mode 100644 .gitignore create mode 100644 build.gradle create mode 100644 src/main/java/org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java create mode 100644 src/main/java/org/bdware/consistency/plugin/pbft/ContractCluster.java create mode 100644 src/main/java/org/bdware/consistency/plugin/pbft/PBFTExecutor.java create mode 100644 src/main/java/org/bdware/consistency/plugin/pbft/PBFTExecutorFactory.java create mode 100644 src/main/java/org/bdware/consistency/plugin/ra/RequestAllExecutor.java create mode 100644 src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseAllFactory.java create mode 100644 src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseFirstFactory.java create mode 100644 src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseHalfFactory.java create mode 100644 src/main/java/org/bdware/consistency/plugin/ro/RequestOnceExecutor.java create mode 100644 src/main/java/org/bdware/consistency/plugin/ro/RequestOnceExecutorFactory.java create mode 100644 src/main/java/org/bdware/consistency/plugin/ro/ResponseOnceExecutor.java create mode 100644 src/main/java/org/bdware/consistency/plugin/ro/ResponseOnceExecutorFactory.java create mode 100644 src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java create mode 100644 src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutorFactory.java create mode 100644 src/main/java/org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutor.java create mode 100644 src/main/java/org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutorFactory.java create mode 100644 src/main/java/org/bdware/consistency/plugin/single/SingleNodeExecutorFactory.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e9e941c --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +/build/ +/testoutput/ +*/build/* +# Compiled class file +*.class +.DS_Store +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* diff --git a/build.gradle b/build.gradle new file mode 100644 index 0000000..2a95d70 --- /dev/null +++ b/build.gradle @@ -0,0 +1,25 @@ +plugins { + id 'java' +} + +repositories { + mavenCentral() +} + +dependencies { + implementation project(":consistency-sdk") + + testImplementation 'junit:junit:4.13.2' +} + +jar { + String libs = '' + configurations.runtimeClasspath.each { + libs = libs + " libs/" + it.name + } + + manifest { + attributes 'Manifest-Version': archiveVersion + attributes 'Class-Path': libs + } +} \ No newline at end of file diff --git a/src/main/java/org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java b/src/main/java/org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java new file mode 100644 index 0000000..76f69ea --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java @@ -0,0 +1,13 @@ +package org.bdware.consistency.plugin.common; + +import org.bdware.sdk.consistency.ConsistencyPluginManager; +import org.bdware.sdk.consistency.api.context.*; +import org.bdware.server.trustedmodel.ContractExecutor; + +public abstract class AbstractContextContractExecutor implements ContractExecutor { + static protected IGlobalConf globalConf = ConsistencyPluginManager.getContext().getGlobalConf(); + static protected ICMActions cmActions = ConsistencyPluginManager.getContext().getCMActions(); + static protected INetworkManager networkManager = ConsistencyPluginManager.getContext().getNetworkManager(); + static protected IMasterServerTCPAction masterServerTCPAction = ConsistencyPluginManager.getContext().getMasterServerTCPAction(); + static protected IMasterServerRecoverMechAction masterServerRecoverMechAction = ConsistencyPluginManager.getContext().getMasterServerRecoverMechAction(); +} diff --git a/src/main/java/org/bdware/consistency/plugin/pbft/ContractCluster.java b/src/main/java/org/bdware/consistency/plugin/pbft/ContractCluster.java new file mode 100644 index 0000000..48fdada --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/pbft/ContractCluster.java @@ -0,0 +1,35 @@ +package org.bdware.consistency.plugin.pbft; + +import com.google.gson.JsonObject; +import org.bdware.sc.conn.ByteUtil; +import org.bdware.sc.units.PubKeyNode; +import org.bdware.sc.units.TrustfulExecutorConnection; +import org.bdware.sdk.consistency.ConsistencyPluginManager; + +import java.util.ArrayList; +import java.util.List; + +public class ContractCluster implements TrustfulExecutorConnection { + private final List members; + String contractID; + + public ContractCluster(String contractID, List members) { + this.members = new ArrayList<>(); + this.members.addAll(members); + this.contractID = contractID; + } + + @Override + public void sendMessage(PubKeyNode node, byte[] msg) { + JsonObject jo = new JsonObject(); + jo.addProperty("action", "contractSyncMessage"); + jo.addProperty("contractID", contractID); + jo.addProperty("data", ByteUtil.encodeBASE64(msg)); + ConsistencyPluginManager.getInstance().getContext().getNetworkManager().sendToAgent(node.pubkey, jo.toString()); + } + + @Override + public List getNodes() { + return members; + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/pbft/PBFTExecutor.java b/src/main/java/org/bdware/consistency/plugin/pbft/PBFTExecutor.java new file mode 100644 index 0000000..df1687a --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/pbft/PBFTExecutor.java @@ -0,0 +1,374 @@ +package org.bdware.consistency.plugin.pbft; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonPrimitive; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.consistency.plugin.common.AbstractContextContractExecutor; +import org.bdware.sc.ComponedContractResult; +import org.bdware.sc.ContractResult; +import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.conn.Node; +import org.bdware.sc.conn.OnHashCallback; +import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.consistency.Committer; +import org.bdware.sc.consistency.pbft.PBFTAlgorithm; +import org.bdware.sc.consistency.pbft.PBFTMember; +import org.bdware.sc.consistency.pbft.PBFTMessage; +import org.bdware.sc.consistency.pbft.PBFTType; +import org.bdware.sc.units.*; +import org.bdware.sc.util.JsonUtil; +import org.bdware.server.trustedmodel.MultiReqSeq; +import org.zz.gmhelper.SM2KeyPair; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +//TODO 追赶差下的调用 +public class PBFTExecutor extends AbstractContextContractExecutor { + private static final Logger LOGGER = LogManager.getLogger(PBFTExecutor.class); + final Object lock = new Object(); + private final List members; + int resultCount; + + AtomicInteger request_index = new AtomicInteger(0); + // key为requestID,value为其seq + Map seqMap = new ConcurrentHashMap<>(); + Map resultCache = new ConcurrentHashMap<>(); + // MultiPointContractInfo info; + String contractID; + PBFTAlgorithm pbft; + ContractCluster contractCluster; + boolean isMaster; + + public PBFTExecutor( + int c, String con_id, final String masterPubkey, String[] members) { + resultCount = c; + contractID = con_id; + this.members = new ArrayList<>(); + isMaster = globalConf.getNodeID().equals(masterPubkey); + pbft = new PBFTAlgorithm(isMaster); + int count = 0; + for (String mem : members) { + PubKeyNode pubkeyNode = new PubKeyNode(); + pubkeyNode.pubkey = mem; + PBFTMember pbftMember = new PBFTMember(); + pbftMember.isMaster = mem.equals(masterPubkey); + pbft.addMember(pubkeyNode, pbftMember); + this.members.add(pubkeyNode); + if (globalConf.getNodeID().equals(mem)) { + pbft.setSendID(count); + } + count++; + } + contractCluster = new ContractCluster(contractID, this.members); + pbft.setConnection(contractCluster); + final MultiContractMeta cei = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID); + pbft.setCommitter(new Committer() { + @Override + public void onCommit(ContractRequest data) { + ResultCallback ret = null; + final long startTime = System.currentTimeMillis(); + ret = new ResultCallback() { + @Override + public void onResult(String str) { + Map ret = new HashMap<>(); + ret.put("action", "receiveTrustfullyResult"); + SM2KeyPair keyPair = globalConf.getKeyPair(); + ret.put("nodeID", keyPair.getPublicKeyStr()); + ret.put("responseID", data.getRequestID()); + ret.put("executeTime", (System.currentTimeMillis() - startTime) + ""); + ret.put("data", str); + cei.setLastExeSeq(data.seq); + networkManager.sendToAgent(masterPubkey, JsonUtil.toJson(ret)); + } + }; + cmActions.getManager().executeLocallyAsync(data, ret, null); + } + }); + } + + public void onSyncMessage(Node node, byte[] data) { + + pbft.onMessage(node, data); + } + + public void setSeq(int seq) { + request_index = new AtomicInteger(seq); + pbft.setAtomSeq(request_index.get()); + } + + public ResultCallback createResultCallback( + final String requestID, + final ResultCallback originalCb, + final int count, + final int request_seq, + final String contractID) { + ComponedContractResult componedContractResult = new ComponedContractResult(count); + // TODO 加对应的超时? + return networkManager.createResultCallback( + requestID, new ResultMerger(originalCb, count, request_seq, contractID), count); + } + + public void sendRequest(String id, ContractRequest req, ResultCallback collector) { +// Map reqStr = new HashMap<>(); +// reqStr.put("uniReqID", id); +// reqStr.put("data", req); +// reqStr.put("action", "executeContractLocally"); + ContractRequest cr2 = ContractRequest.parse(req.toByte()); + cr2.setRequestID(id); + PBFTMessage request = new PBFTMessage(); + request.setOrder(req.seq); + request.setType(PBFTType.Request); + request.setContent(cr2.toByte()); + for (PubKeyNode node : members) { + if (!networkManager.hasAgentConnection(node.pubkey)) { + LOGGER.warn("cmNode " + node.pubkey.substring(0, 5) + " is null"); + collector.onResult( + "{\"status\":\"Error\",\"result\":\"node offline\"," + + "\"nodeID\":\"" + + node + + "\"," + + "\"action\":\"onExecuteContractTrustfully\"}"); +// } else if (MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID) +// != RecoverFlag.Fine) { +// collector.onResult( +// "{\"status\":\"Error\",\"result\":\"node recovering\"," +// + "\"nodeID\":\"" +// + node +// + "\"," +// + "\"action\":\"onExecuteContractTrustfully\"}"); +// contractCluster.sendMessage(node, request.getBytes()); + } else { + contractCluster.sendMessage(node, request.getBytes()); + } + } + // master负责缓存请求 + if (!masterServerTCPAction.getReqCache().containsKey(contractID)) { + masterServerTCPAction.getReqCache().put(contractID, new RequestCache()); + } + // TODO 多调多统一个seq的有多个请求,这个需要改 + String[] nodes = + cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers(); + LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID()); + LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes)); + + } + + + public boolean checkCurNodeNumValid() { + return true; + } + + @Override + public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { + LOGGER.debug(JsonUtil.toJson(req)); + MultiContractMeta meta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(req.getContractID()); + if (meta == null || !meta.isMaster()) { + cmActions.getManager().executeContractOnOtherNodes(req, rc); + return; + } + req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID())); + + // 三个相同requestID进来的时候,会有冲突。 + // 仅在此处有冲突么? + // 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 + + // 如果是多点合约的请求,A1、A2、A3的序号应该一致,不能分配一个新的seq,根据requestID判断是否不需要重新分配一个序号 + //TODO seqMap memory leak + //TODO + //TODO + if (null != requestID && requestID.endsWith("_mul")) { + synchronized (lock) { + if (seqMap.containsKey(requestID)) { + req.seq = seqMap.get(requestID).seq; + } else { + req.seq = request_index.getAndIncrement(); + seqMap.put(requestID, new MultiReqSeq(req.seq)); + } + } + } else { + req.seq = request_index.getAndIncrement(); + } + req.needSeq = true; + String id = + System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq; + LOGGER.info("execute receive requestID=" + requestID + " msgID=" + id); + + if (checkCurNodeNumValid()) { + LOGGER.debug("checkCurNodeNumValid=true"); + ResultCallback collector = + createResultCallback(id, rc, resultCount, req.seq, req.getContractID()); + masterServerTCPAction.getSync().sleep(id, collector); + LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); + sendRequest(id, req, collector); + } else { + LOGGER.debug("invalidNodeNumOnResult"); + request_index.getAndDecrement(); + ContractResult finalResult = + new ContractResult( + ContractResult.Status.Error, + new JsonPrimitive("node number unavailable, request refused.")); + rc.onResult(JsonUtil.toJson(finalResult)); + } + + // } + + /* // 三个相同requestID进来的时候,会有冲突。 + // 仅在此处有冲突么? + // 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 + req.seq = request_index.getAndIncrement(); + req.needSeq = true; + ResultCallback collector = createResultCallback(id, rc, resultCount, req.getContractID()); + MasterServerTCPAction.sync.sleep(id, collector); + sendRequest(id, req, collector);*/ + } + + // 清理缓存的多点合约请求序号 + public void clearCache() { + final long time = System.currentTimeMillis() - 30000L; + seqMap.entrySet() + .removeIf( + entry -> { + MultiReqSeq cache = entry.getValue(); + if (null == cache) { + return true; + } + return cache.startTime < time; + }); + } + + public static class ResultMerger extends ResultCallback { + ComponedContractResult componedContractResult; + AtomicInteger order; + String contractID; + int count; + int request_seq; + ResultCallback originalCallback; + Set nodeIDs = new HashSet<>(); // 已收到返回结果的节点 + + ResultMerger( + final ResultCallback originalCb, + final int count, + final int request_seq, + final String contractID) { + originalCallback = originalCb; + this.count = count; + this.request_seq = request_seq; + this.contractID = contractID; + componedContractResult = new ComponedContractResult(count); + order = new AtomicInteger(0); + } + + public String getContractID() { + return contractID; + } + + public String getInfo() { + return "contractID=" + + contractID + + " 收到第 " + + order + + " 个节点回复 : " + + " order=" + + order + + " count=" + + count + + " "; + } + + @Override + public void onResult(String str) { + // TODO 必须在这里聚合。 + // str的data是个ContractResult + // 在这儿也是返回个ContractResult + try { + LOGGER.debug("a result of contract" + contractID + ": " + str); + JsonObject obj = JsonParser.parseString(str).getAsJsonObject(); + if (obj.has("nodeID")) { + String id = obj.get("nodeID").getAsString(); + if (nodeIDs.contains(id)) { + LOGGER.debug( + "ignored result because the result of node " + + id.substring(0, 5) + + " has been received"); + return; + } + nodeIDs.add(id); + } + + LOGGER.debug( + String.format( + "contractID=%s received=%s order=%d count=%d", + contractID, str, order.get(), count)); + componedContractResult.add(obj); + // 收集到所有结果 + if (order.incrementAndGet() == count) { + ContractResult finalResult = componedContractResult.figureFinalResult(); + finalResult.needSeq = true; + finalResult.seq = request_seq; + + // if (null == finalResult) { + // finalResult = + // new ContractResult( + // ContractResult.Status.Exception, + // new JsonPrimitive( + // "no nore than half of the + // consistent result")); + // originalCallback.onResult(new + // Gson().toJson(finalResult)); + // } else { + originalCallback.onResult(JsonUtil.toJson(finalResult)); + // } + LOGGER.debug( + String.format( + "%d results are the same: %s", + finalResult.size, finalResult.result)); + + // 集群中事务序号+1 + cmActions.getManager().multiContractRecorder + .getMultiContractMeta(contractID) + .nextSeqAtMaster(); + + // recover,其中无状态合约CP出错无需恢复 + Set nodesID = componedContractResult.getProblemNodes(); + if (null == nodesID || nodesID.isEmpty()) { + return; + } + for (String nodeID : nodesID) { + LOGGER.warn("node fails! " + nodeID); + if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) + == RecoverFlag.Fine) { + masterServerRecoverMechAction.getRecoverStatusMap() + .get(nodeID) + .put(contractID, RecoverFlag.ToRecover); + } + } + for (String nodeID : nodesID) { + if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) + == RecoverFlag.ToRecover) { + LOGGER.warn("node in recover " + nodeID); + + // 因为该节点结果有误,所以即时是stableMode也认为trans记录不可信 + // 直接通过load别的节点来恢复 + masterServerRecoverMechAction.restartContractFromCommonMode( + nodeID, contractID); + } + } + } + // clearCache(); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn("result exception!"); + } + } + } + + @Override + public void onRecover(Map args) { + int ceiLastExeSeq = (int) args.get("ceiLastExeSeq"); + this.setSeq(ceiLastExeSeq + 1); + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/pbft/PBFTExecutorFactory.java b/src/main/java/org/bdware/consistency/plugin/pbft/PBFTExecutorFactory.java new file mode 100644 index 0000000..28dc488 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/pbft/PBFTExecutorFactory.java @@ -0,0 +1,22 @@ +package org.bdware.consistency.plugin.pbft; + +import org.bdware.sdk.consistency.api.ContractExecutorFactory; +import org.bdware.server.trustedmodel.ContractExecutor; + +import java.util.Map; + +public class PBFTExecutorFactory implements ContractExecutorFactory { + @Override + public String getExecutorName() { + return "PBFT"; + } + + @Override + public ContractExecutor getInstance(Map args) { + int nodeSize = (int) args.get("nodeSize"); + String contractID = (String) args.get("contractID"); + String masterPubkey = (String) args.get("masterPubkey"); + String[] members = (String[]) args.get("members"); + return new PBFTExecutor(nodeSize, contractID, masterPubkey, members); + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/ra/RequestAllExecutor.java b/src/main/java/org/bdware/consistency/plugin/ra/RequestAllExecutor.java new file mode 100644 index 0000000..7a5af99 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/ra/RequestAllExecutor.java @@ -0,0 +1,343 @@ +package org.bdware.consistency.plugin.ra; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonPrimitive; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.consistency.plugin.common.AbstractContextContractExecutor; +import org.bdware.sc.ComponedContractResult; +import org.bdware.sc.ContractResult; +import org.bdware.sc.bean.ContractExecType; +import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.conn.OnHashCallback; +import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.units.MultiContractMeta; +import org.bdware.sc.units.RecoverFlag; +import org.bdware.sc.units.RequestCache; +import org.bdware.sc.units.ResultCache; +import org.bdware.sc.util.JsonUtil; +import org.bdware.sdk.consistency.api.NotifiableResultMerger; +import org.bdware.server.trustedmodel.MultiReqSeq; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class RequestAllExecutor extends AbstractContextContractExecutor { + private static final Logger LOGGER = LogManager.getLogger(RequestAllExecutor.class); + final Object lock = new Object(); + int resultCount; + AtomicInteger request_index = new AtomicInteger(0); + ContractExecType type; + // key为requestID,value为其seq + Map seqMap = new ConcurrentHashMap<>(); + Map resultCache = new ConcurrentHashMap<>(); + // MultiPointContractInfo info; + String contractID; + + public RequestAllExecutor( + ContractExecType t, int c, String con_id) { + type = t; + resultCount = c; + contractID = con_id; + } + + public void setSeq(int seq) { + request_index = new AtomicInteger(seq); + } + + public ResultCallback createResultCallback( + final String requestID, + final ResultCallback originalCb, + final int count, + final int request_seq, + final String contractID) { + ComponedContractResult componedContractResult = new ComponedContractResult(count); + // TODO 加对应的超时? + return networkManager.createResultCallback( + requestID, new ResultMerger(originalCb, count, request_seq, contractID), count); + } + + public void sendRequest(String id, ContractRequest req, ResultCallback collector) { + Map reqStr = new HashMap<>(); + reqStr.put("uniReqID", id); + reqStr.put("data", req); + reqStr.put("action", "executeContractLocally"); + String sendStr = JsonUtil.toJson(reqStr); + + // master负责缓存请求 + if (!masterServerTCPAction.getReqCache().containsKey(contractID)) { + masterServerTCPAction.getReqCache().put(contractID, new RequestCache()); + } + // TODO 多调多统一个seq的有多个请求,这个需要改 + masterServerTCPAction.getReqCache().get(contractID).put(req.seq, sendStr); + + LOGGER.debug(JsonUtil.toJson(req)); + + String[] nodes = + cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers(); + LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID()); + LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes)); + for (String node : nodes) { + LOGGER.debug("get cmNode " + node.substring(0, 5)); + if (!networkManager.hasAgentConnection(node)) { + LOGGER.warn("cmNode " + node.substring(0, 5) + " is null"); + collector.onResult( + "{\"status\":\"Error\",\"result\":\"node offline\"," + + "\"nodeID\":\"" + + node + + "\"," + + "\"action\":\"onExecuteContractTrustfully\"}"); + } else if (masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID) + != RecoverFlag.Fine) { + collector.onResult( + "{\"status\":\"Error\",\"result\":\"node recovering\"," + + "\"nodeID\":\"" + + node + + "\"," + + "\"action\":\"onExecuteContractTrustfully\"}"); + networkManager.sendToAgent(node, sendStr); + } else { + LOGGER.info("send request to cmNode " + node.substring(0, 5)); + networkManager.sendToAgent(node, sendStr); + } + } + } + + public boolean checkCurNodeNumValid() { + String[] nodes = + cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers(); + int validNode = 0; + Map mapResult = new HashMap<>(); + for (String node : nodes) { + mapResult.put(node.substring(0, 5), String.format("%s %s", networkManager.hasAgentConnection(node) + "", + masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID))); + if (networkManager.hasAgentConnection(node) + && masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID) + == RecoverFlag.Fine) { + validNode++; + } + } + LOGGER.info(JsonUtil.toPrettyJson(mapResult)); + int c = resultCount; + if (ContractExecType.RequestAllResponseAll.equals(type)) { + c = (int) Math.ceil((double) c / 2); + } + LOGGER.debug("c=" + c + " validNode=" + validNode); + return validNode >= c; + } + + @Override + public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { + LOGGER.debug(JsonUtil.toJson(req)); + MultiContractMeta meta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(req.getContractID()); + if (meta == null || !meta.isMaster()) { + cmActions.getManager().executeContractOnOtherNodes(req, rc); + return; + } + req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID())); + + // 三个相同requestID进来的时候,会有冲突。 + // 仅在此处有冲突么? + // 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 + + // 如果是多点合约的请求,A1、A2、A3的序号应该一致,不能分配一个新的seq,根据requestID判断是否不需要重新分配一个序号 + //TODO seqMap memory leak + //TODO + //TODO + if (null != requestID && requestID.endsWith("_mul")) { + synchronized (lock) { + if (seqMap.containsKey(requestID)) { + req.seq = seqMap.get(requestID).seq; + } else { + req.seq = request_index.getAndIncrement(); + seqMap.put(requestID, new MultiReqSeq(req.seq)); + } + } + } else { + req.seq = request_index.getAndIncrement(); + } + req.needSeq = true; + String id = + System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq; + LOGGER.info("execute receive requestID=" + requestID + " msgID=" + id); + + if (checkCurNodeNumValid()) { + LOGGER.debug("checkCurNodeNumValid=true"); + ResultCallback collector = + createResultCallback(id, rc, resultCount, req.seq, req.getContractID()); + masterServerTCPAction.getSync().sleep(id, collector); + LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); + sendRequest(id, req, collector); + } else { + LOGGER.debug("invalidNodeNumOnResult"); + request_index.getAndDecrement(); + ContractResult finalResult = + new ContractResult( + ContractResult.Status.Error, + new JsonPrimitive("node number unavailable, request refused.")); + rc.onResult(JsonUtil.toJson(finalResult)); + } + + // } + + /* // 三个相同requestID进来的时候,会有冲突。 + // 仅在此处有冲突么? + // 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 + req.seq = request_index.getAndIncrement(); + req.needSeq = true; + ResultCallback collector = createResultCallback(id, rc, resultCount, req.getContractID()); + MasterServerTCPAction.sync.sleep(id, collector); + sendRequest(id, req, collector);*/ + } + + // 清理缓存的多点合约请求序号 + public void clearCache() { + final long time = System.currentTimeMillis() - 30000L; + seqMap.entrySet() + .removeIf( + entry -> { + MultiReqSeq cache = entry.getValue(); + if (null == cache) { + return true; + } + return cache.startTime < time; + }); + } + + public static class ResultMerger extends ResultCallback implements NotifiableResultMerger { + ComponedContractResult componedContractResult; + AtomicInteger order; + String contractID; + int count; + int request_seq; + ResultCallback originalCallback; + Set nodeIDs = new HashSet<>(); // 已收到返回结果的节点 + + ResultMerger( + final ResultCallback originalCb, + final int count, + final int request_seq, + final String contractID) { + originalCallback = originalCb; + this.count = count; + this.request_seq = request_seq; + this.contractID = contractID; + componedContractResult = new ComponedContractResult(count); + order = new AtomicInteger(0); + } + + public String getContractID() { + return contractID; + } + + public String getInfo() { + return "contractID=" + + contractID + + " 收到第 " + + order + + " 个节点回复 : " + + " order=" + + order + + " count=" + + count + + " "; + } + + @Override + public void onResult(String str) { + // TODO 必须在这里聚合。 + // str的data是个ContractResult + // 在这儿也是返回个ContractResult + try { + LOGGER.debug("a result of contract" + contractID + ": " + str); + JsonObject obj = JsonParser.parseString(str).getAsJsonObject(); + if (obj.has("nodeID")) { + String id = obj.get("nodeID").getAsString(); + if (nodeIDs.contains(id)) { + LOGGER.debug( + "ignored result because the result of node " + + id.substring(0, 5) + + " has been received"); + return; + } + nodeIDs.add(id); + } + + LOGGER.debug( + String.format( + "contractID=%s received=%s order=%d count=%d", + contractID, str, order.get(), count)); + componedContractResult.add(obj); + // 收集到所有结果 + if (order.incrementAndGet() == count) { + ContractResult finalResult = componedContractResult.figureFinalResult(); + finalResult.needSeq = true; + finalResult.seq = request_seq; + + // if (null == finalResult) { + // finalResult = + // new ContractResult( + // ContractResult.Status.Exception, + // new JsonPrimitive( + // "no nore than half of the + // consistent result")); + // originalCallback.onResult(new + // Gson().toJson(finalResult)); + // } else { + originalCallback.onResult(JsonUtil.toJson(finalResult)); + // } + LOGGER.debug( + String.format( + "%d results are the same: %s", + finalResult.size, finalResult.result)); + + // 集群中事务序号+1 + cmActions.getManager().multiContractRecorder + .getMultiContractMeta(contractID) + .nextSeqAtMaster(); + + // recover,其中无状态合约CP出错无需恢复 + Set nodesID = componedContractResult.getProblemNodes(); + if (null == nodesID || nodesID.isEmpty()) { + return; + } + for (String nodeID : nodesID) { + LOGGER.warn("node fails! " + nodeID); + if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) + == RecoverFlag.Fine) { + masterServerRecoverMechAction.getRecoverStatusMap() + .get(nodeID) + .put(contractID, RecoverFlag.ToRecover); + } + } + for (String nodeID : nodesID) { + if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) + == RecoverFlag.ToRecover) { + LOGGER.warn("node in recover " + nodeID); + + // 因为该节点结果有误,所以即时是stableMode也认为trans记录不可信 + // 直接通过load别的节点来恢复 + masterServerRecoverMechAction.restartContractFromCommonMode( + nodeID, contractID); + } + } + } + // clearCache(); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn("result exception!"); + } + } + } + + @Override + public void onRecover(Map args) { + int ceiLastExeSeq = (int) args.get("ceiLastExeSeq"); + this.setSeq(ceiLastExeSeq + 1); + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseAllFactory.java b/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseAllFactory.java new file mode 100644 index 0000000..cae3d9e --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseAllFactory.java @@ -0,0 +1,21 @@ +package org.bdware.consistency.plugin.ra; + +import org.bdware.sc.bean.ContractExecType; +import org.bdware.sdk.consistency.api.ContractExecutorFactory; +import org.bdware.server.trustedmodel.ContractExecutor; + +import java.util.Map; + +public class RequestAllResponseAllFactory implements ContractExecutorFactory { + @Override + public String getExecutorName() { + return "RARA"; + } + + @Override + public ContractExecutor getInstance(Map args) { + int nodeSize = (int) args.get("nodeSize"); + String contractID = (String) args.get("contractID"); + return new RequestAllExecutor(ContractExecType.RequestAllResponseAll, nodeSize, contractID); + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseFirstFactory.java b/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseFirstFactory.java new file mode 100644 index 0000000..2861757 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseFirstFactory.java @@ -0,0 +1,21 @@ +package org.bdware.consistency.plugin.ra; + +import org.bdware.sc.bean.ContractExecType; +import org.bdware.sdk.consistency.api.ContractExecutorFactory; +import org.bdware.server.trustedmodel.ContractExecutor; + +import java.util.Map; + +public class RequestAllResponseFirstFactory implements ContractExecutorFactory { + @Override + public String getExecutorName() { + return "RARF"; + } + + @Override + public ContractExecutor getInstance(Map args) { + String contractID = (String) args.get("contractID"); + return new RequestAllExecutor( + ContractExecType.RequestAllResponseFirst, 1, contractID); + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseHalfFactory.java b/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseHalfFactory.java new file mode 100644 index 0000000..56c8210 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseHalfFactory.java @@ -0,0 +1,24 @@ +package org.bdware.consistency.plugin.ra; + +import org.bdware.sc.bean.ContractExecType; +import org.bdware.sdk.consistency.api.ContractExecutorFactory; +import org.bdware.server.trustedmodel.ContractExecutor; + +import java.util.Map; + +public class RequestAllResponseHalfFactory implements ContractExecutorFactory { + @Override + public String getExecutorName() { + return "RARH"; + } + + @Override + public ContractExecutor getInstance(Map args) { + int nodeSize = (int) args.get("nodeSize"); + String contractID = (String) args.get("contractID"); + return new RequestAllExecutor( + ContractExecType.RequestAllResponseHalf, + nodeSize / 2 + 1, + contractID); + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/ro/RequestOnceExecutor.java b/src/main/java/org/bdware/consistency/plugin/ro/RequestOnceExecutor.java new file mode 100644 index 0000000..fc303d3 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/ro/RequestOnceExecutor.java @@ -0,0 +1,63 @@ +package org.bdware.consistency.plugin.ro; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.consistency.plugin.common.AbstractContextContractExecutor; +import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.conn.OnHashCallback; +import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.util.JsonUtil; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class RequestOnceExecutor extends AbstractContextContractExecutor { + private static final Logger LOGGER = LogManager.getLogger(RequestOnceExecutor.class); + String contractID; + AtomicInteger order = new AtomicInteger(0); + + public RequestOnceExecutor(String contractID) { + this.contractID = contractID; + } + + @Override + public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { + ResultCallback cb = + new ResultCallback() { + @Override + public void onResult(String str) { + LOGGER.debug(str); + JsonObject jo = JsonParser.parseString(str).getAsJsonObject(); + JsonObject result = + JsonParser.parseString(jo.get("data").getAsString()) + .getAsJsonObject(); + for (String key : result.keySet()) jo.add(key, result.get(key)); + jo.remove("action"); + jo.addProperty("action", "onExecuteResult"); + LOGGER.debug(jo.toString()); + rc.onResult(jo.toString()); + } + }; + masterServerTCPAction.getSync().sleep(requestID, cb); + String[] members = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers(); + for (int i = 0; i < members.length; i++) { + LOGGER.info("[members]:" + members.length); + int size = members.length; + String nodeID = members[order.incrementAndGet() % size]; + //ADD Connect + Map obj = new HashMap<>(); + obj.put("action", "executeContractLocally"); + obj.put("requestID", requestID); + obj.put("data", req); + obj.put("uniReqID", requestID); + networkManager.sendToAgent(nodeID, JsonUtil.toJson(obj)); + return; + } + rc.onResult( + "{\"status\":\"Error\",\"result\":\"all nodes " + + " offline\",\"action\":\"onExecuteContract\"}"); + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/ro/RequestOnceExecutorFactory.java b/src/main/java/org/bdware/consistency/plugin/ro/RequestOnceExecutorFactory.java new file mode 100644 index 0000000..1d597f4 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/ro/RequestOnceExecutorFactory.java @@ -0,0 +1,19 @@ +package org.bdware.consistency.plugin.ro; + +import org.bdware.sdk.consistency.api.ContractExecutorFactory; +import org.bdware.server.trustedmodel.ContractExecutor; + +import java.util.Map; + +public class RequestOnceExecutorFactory implements ContractExecutorFactory { + @Override + public String getExecutorName() { + return "RequestOnce"; + } + + @Override + public ContractExecutor getInstance(Map args) { + String contractID = (String) args.get("contractID"); + return new RequestOnceExecutor(contractID); + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/ro/ResponseOnceExecutor.java b/src/main/java/org/bdware/consistency/plugin/ro/ResponseOnceExecutor.java new file mode 100644 index 0000000..824a74c --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/ro/ResponseOnceExecutor.java @@ -0,0 +1,80 @@ +package org.bdware.consistency.plugin.ro; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.consistency.plugin.common.AbstractContextContractExecutor; +import org.bdware.sc.ContractResult; +import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.conn.OnHashCallback; +import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.util.JsonUtil; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class ResponseOnceExecutor extends AbstractContextContractExecutor { + private static final Logger LOGGER = LogManager.getLogger(ResponseOnceExecutor.class); + private final String contractID; + AtomicInteger order = new AtomicInteger(0); + + public ResponseOnceExecutor(String contractID) { + this.contractID = contractID; + } + + @Override + public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { + executeInternal(requestID, rc, req, 2); + } + + private void executeInternal( + String requestID, ResultCallback rc, ContractRequest req, int count) { + // String contractID = req.getContractID(); + // TODO 标注失效节点,是否选择重新迁移?? + ResultCallback cb = + new ResultCallback() { + @Override + public void onResult(String str) { + LOGGER.debug(str); + JsonObject jo = JsonParser.parseString(str).getAsJsonObject(); + jo.remove("action"); + jo.addProperty("action", "onExecuteResult"); + LOGGER.debug(jo.toString()); + if (jo.has("data")) { + String data = jo.get("data").getAsString(); + ContractResult cr = JsonUtil.fromJson(data, ContractResult.class); + if (cr.status != ContractResult.Status.Success && count > 0) { + executeInternal(requestID, rc, req, count - 1); + } else rc.onResult(jo.toString()); + } else { + JsonObject jo2 = new JsonObject(); + jo2.addProperty("action", "onExecuteResult"); + jo.remove("action"); + jo2.addProperty("data", jo.toString()); + rc.onResult(jo2.toString()); + } + } + }; + masterServerTCPAction.getSync().sleepWithTimeout(requestID, cb, 5); + if (!sendOnce(requestID, req)) + rc.onResult( + "{\"status\":\"Error\",\"data\":\"{\\\"status\\\":\\\"Error\\\",\\\"result\\\":\\\"all nodes offline\\\"}\",\"action\":\"onExecuteContract\"}"); + } + + private boolean sendOnce(String requestID, ContractRequest req) { + String[] members = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers(); + for (int i = 0; i < members.length; i++) { + int size = members.length; + String nodeID = members[order.incrementAndGet() % size]; + Map obj = new HashMap<>(); + obj.put("action", "executeContractLocally"); + obj.put("data", req); + obj.put("uniReqID", requestID); + networkManager.sendToAgent(nodeID, JsonUtil.toJson(obj)); + return true; + } + return false; + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/ro/ResponseOnceExecutorFactory.java b/src/main/java/org/bdware/consistency/plugin/ro/ResponseOnceExecutorFactory.java new file mode 100644 index 0000000..8fa8056 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/ro/ResponseOnceExecutorFactory.java @@ -0,0 +1,19 @@ +package org.bdware.consistency.plugin.ro; + +import org.bdware.sdk.consistency.api.ContractExecutorFactory; +import org.bdware.server.trustedmodel.ContractExecutor; + +import java.util.Map; + +public class ResponseOnceExecutorFactory implements ContractExecutorFactory { + @Override + public String getExecutorName() { + return "ResponseOnce"; + } + + @Override + public ContractExecutor getInstance(Map args) { + String contractID = (String) args.get("contractID"); + return new ResponseOnceExecutor(contractID); + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java b/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java new file mode 100644 index 0000000..81abd61 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java @@ -0,0 +1,395 @@ +package org.bdware.consistency.plugin.sharding; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonPrimitive; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.consistency.plugin.common.AbstractContextContractExecutor; +import org.bdware.sc.ComponedContractResult; +import org.bdware.sc.ContractMeta; +import org.bdware.sc.ContractResult; +import org.bdware.sc.bean.*; +import org.bdware.sc.conn.OnHashCallback; +import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.units.MultiContractMeta; +import org.bdware.sc.units.RecoverFlag; +import org.bdware.sc.units.RequestCache; +import org.bdware.sc.units.ResultCache; +import org.bdware.sc.util.JsonUtil; +import org.bdware.server.trustedmodel.MultiReqSeq; + +import java.math.BigInteger; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +// 改为MultiPointCooperationExecutor +public class MultiPointCooperationExecutor extends AbstractContextContractExecutor { + private static final Logger LOGGER = LogManager.getLogger(MultiPointCooperationExecutor.class); + final Object lock = new Object(); + int resultCount; + AtomicInteger request_index = new AtomicInteger(0); + ContractExecType type; + // key为requestID,value为其seq + Map seqMap = new ConcurrentHashMap<>(); + Map resultCache = new ConcurrentHashMap<>(); + // MultiPointContractInfo info; + MultiContractMeta multiMeta; + String contractID; + + public MultiPointCooperationExecutor(ContractExecType t, int c, String con_id) { + LOGGER.info("-- sharding executor---"); + type = t; + resultCount = c; + contractID = con_id; + multiMeta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID); + } + + public void setSeq(int seq) { + request_index = new AtomicInteger(seq); + } + + public ResultCallback createResultCallback( + final String requestID, + final ResultCallback originalCb, + final int count, + final int request_seq, + final String contractID, JoinInfo joinInfo) { + // TODO 加对应的超时? + return networkManager.createResultCallback( + requestID, + new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo), + count); // 把count改成了1,设置成获得1个响应就行 + } + + public void sendRequest(String id, ContractRequest req, String[] nodes) { + Map reqStr = new HashMap<>(); + reqStr.put("uniReqID", id); + reqStr.put("data", req); + req.needSeq = false; + reqStr.put("action", "executeContractLocally"); + String sendStr = JsonUtil.toJson(reqStr); + // master负责缓存请求 + if (!masterServerTCPAction.getReqCache().containsKey(contractID)) { + masterServerTCPAction.getReqCache().put(contractID, new RequestCache()); + } + // TODO 多调多统一个seq的有多个请求,这个需要改 + masterServerTCPAction.getReqCache().get(contractID).put(req.seq, sendStr); + LOGGER.debug(JsonUtil.toJson(req)); + LOGGER.info("node size = " + nodes.length); + LOGGER.debug("nodes:" + JsonUtil.toJson(nodes)); + for (String node : nodes) { + LOGGER.info( + "[sendRequests] get cmNode " + + node.substring(0, 5) + + " not null " + + "org.bdware.consistency.plugin.ra.RequestAllExecutor 发送请求给 " + + node.substring(0, 5)); + networkManager.sendToAgent(node, sendStr); + } + } + + private String[] getAccordingToRouteInfo(RouteInfo routeInfo, ContractRequest req, String[] members) { + try { + int val; + switch (routeInfo.useDefault) { + case byRequester: + val = + new BigInteger(req.getRequester(), 16) + .mod(new BigInteger("" + members.length)) + .intValue(); + while (val < 0) { + val = val + members.length; + } + return new String[]{members[val]}; + case byArgHash: + val = req.getArg().hashCode(); + val = val % members.length; + while (val < 0) { + val += members.length; + } + return new String[]{members[val]}; + case byTarget: + JsonObject jo = req.getArg().getAsJsonObject(); + val = + new BigInteger(jo.get("target").getAsString(), 16) + .mod(new BigInteger("" + members.length)) + .intValue(); + while (val < 0) { + val = val + members.length; + } + return new String[]{members[val]}; + default: + return members; + } + } catch (Exception e) { + return members; + } + } + + public boolean checkCurNodeNumValid() { + LOGGER.info("checkCurNodeNumValid"); + String[] nodes = multiMeta.getMembers(); + // List nodes = info.members; + int validNode = 0; + for (String node : nodes) { + if (networkManager.hasAgentConnection(node) + && masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID) + == RecoverFlag.Fine) { + validNode++; + } + } + int c = resultCount; + if (type == ContractExecType.Sharding) c = (int) Math.ceil((double) c / 2); + LOGGER.info("c=" + c + " validNode=" + validNode); + return validNode >= c; + } + + @Override + public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { + LOGGER.info("[org.bdware.consistency.plugin.sharding.MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req)); + // 获得action 函数名 + LOGGER.info("action is : " + req.getAction()); + req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID())); + if (requestID != null && requestID.endsWith("_mul")) { + synchronized (lock) { + if (seqMap.containsKey(requestID)) { + req.seq = seqMap.get(requestID).seq; + } else { + req.seq = request_index.getAndIncrement(); + seqMap.put(requestID, new MultiReqSeq(req.seq)); + } + } + } else { + req.seq = request_index.getAndIncrement(); + } + req.needSeq = true; + String id = + System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq; + LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id); + if (checkCurNodeNumValid()) { // 校验成功 current node num 合法 + LOGGER.info("checkCurNodeNumValid true"); + ContractMeta meta = + cmActions.getManager().statusRecorder.getContractMeta(req.getContractID()); + FunctionDesp fun = meta.getExportedFunction(req.getAction()); + ResultCallback collector; + // TODO @fanbo 下面的count 1要改,应该是根据route的规则来。 + //Count 根据join规则来。 + //nodes 根据route规则来。 + JoinInfo joinInfo = fun.joinInfo; + RouteInfo routeInfo = fun.routeInfo; + int count = getJoinCount(joinInfo, contractID); + LOGGER.info("requestID=" + requestID + " join Count: " + count); + + String[] members = multiMeta.getMembers(); + String[] nodes = getAccordingToRouteInfo(routeInfo, req, members); + if (nodes.length < count) { + count = nodes.length; + } + collector = + createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器 + masterServerTCPAction.getSync().sleep(id, collector); + LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); + sendRequest(id, req, nodes); // 发送请求 + } else { + LOGGER.info("invalidNodeNumOnResult"); + request_index.getAndDecrement(); + ContractResult finalResult = + new ContractResult( + ContractResult.Status.Error, + new JsonPrimitive("node number unavailbale,request refused.")); + rc.onResult(JsonUtil.toJson(finalResult)); + } + } + + private int getJoinCount(JoinInfo joinInfo, String contractID) { + if (joinInfo == null) return resultCount; + if (joinInfo.joinCount.isJsonPrimitive() && joinInfo.joinCount.getAsJsonPrimitive().isNumber()) { + return joinInfo.joinCount.getAsJsonPrimitive().getAsInt(); + } + try { + ContractRequest cr = new ContractRequest(); + cr.setContractID(contractID); + cr.setAction(joinInfo.joinCount.getAsString()); + //TODO Arg需要好好设计一下。 + //TODO 又好用又简单的那种设计 + //TODO + cr.setArg(""); + String result = cmActions.getManager().executeLocally(cr, null); + return JsonUtil.parseString(result).getAsJsonObject().get("result").getAsInt(); + } catch (Exception e) { + e.printStackTrace(); + return 1; + } + } + + // 清理缓存的多点合约请求序号 + public void clearCache() { + final long time = System.currentTimeMillis() - 30000L; + seqMap.entrySet() + .removeIf( + entry -> { + MultiReqSeq cache = entry.getValue(); + if (null == cache) { + return true; + } + return cache.startTime < time; + }); + } + + public static class ResultMerger extends ResultCallback { + ComponedContractResult componedContractResult; + AtomicInteger order; + String contractID; + int count; // 记录有多少个节点 + int request_seq; + ResultCallback originalCallback; + Set nodeIDs = new HashSet<>(); // 已收到返回结果的节点 + JoinInfo joinInfo; + + ResultMerger( + final ResultCallback originalCb, + final int count, + final int request_seq, + final String contractID, + final JoinInfo joinInfo) { + originalCallback = originalCb; + this.count = count; + this.request_seq = request_seq; + this.contractID = contractID; + componedContractResult = new ComponedContractResult(count); + order = new AtomicInteger(0); + this.joinInfo = joinInfo; + } + + public String getInfo() { + return "contractID=" + + contractID + + " 收到第 " + + order + + " 个节点回复 : " + + " order=" + + order + + " count=" + + count + + " "; + } + + @Override + public void onResult(String str) { + // TODO 必须在这里聚合。 + // str的data是个ContractResult + // 在这儿也是返回个ContractResult + try { + LOGGER.info(str); + JsonObject obj = JsonParser.parseString(str).getAsJsonObject(); + String id = obj.get("nodeID").getAsString(); + if (nodeIDs.contains(id)) { + LOGGER.info("已经收到节点 " + id.substring(0, 5) + " 的结果,该结果被忽略"); + return; + } + nodeIDs.add(id); + LOGGER.info( + "contractID=" + + contractID + + " 收到第 " + + order + + " 个节点回复 : " + + str + + " order=" + + order + + " count=" + + count); + componedContractResult.add(obj); + // 收集到所有结果 + if (order.incrementAndGet() == count) { + ContractResult finalResult = componedContractResult.mergeFinalResult(); + + finalResult.needSeq = true; + finalResult.seq = request_seq; + + // if (null == finalResult) { + // finalResult = + // new ContractResult( + // ContractResult.Status.Exception, + // new JsonPrimitive( + // "no nore than half of the + // consistent result")); + // originalCallback.onResult(new + // Gson().toJson(finalResult)); + // } else { + if (joinInfo != null) { + handleJoinInfo(finalResult, joinInfo); + } + originalCallback.onResult(JsonUtil.toJson(finalResult)); + // } + LOGGER.info( + "本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result); + + // 集群中事务序号+1 + // MasterServerTCPAction.contractID2Members.get(contractID).nextSeq(); + cmActions.getManager() + .multiContractRecorder + .getMultiContractMeta(contractID) + .nextSeqAtMaster(); + // recover,其中无状态合约CP出错无需恢复 + Set nodesID = componedContractResult.getProblemNodes(); + if (null == nodesID || nodesID.isEmpty()) { + return; + } + for (String nodeID : nodesID) { + LOGGER.info("结果出现问题的节点有:" + nodeID); + if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) + == RecoverFlag.Fine) { + masterServerRecoverMechAction.getRecoverStatusMap() + .get(nodeID) + .put(contractID, RecoverFlag.ToRecover); + } + } + for (String nodeID : nodesID) { + if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) + == RecoverFlag.ToRecover) { + LOGGER.info("问题节点开始恢复:" + nodeID); + + // 因为该节点结果有误,所以即时是stableMode也认为trans记录不可信 + // 直接通过load别的节点来恢复 + masterServerRecoverMechAction.restartContractFromCommonMode( + nodeID, contractID); + } + } + } + // clearCache(); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.info("本次执行最终结果为有异常"); + } + } + + private void handleJoinInfo(ContractResult finalResult, JoinInfo joinInfo) { + JsonObject jo = finalResult.result.getAsJsonObject(); + if (joinInfo != null && joinInfo.joinRule != null) { + //TODO 不应该是double 类型 + switch (joinInfo.joinRule) { + case "add": + double val = 0; + for (String key : jo.keySet()) { + val += jo.get(key).getAsDouble(); + } + finalResult.result = new JsonPrimitive(val); + break; + case "multiply": + val = 1; + for (String key : jo.keySet()) { + val *= jo.get(key).getAsDouble(); + } + finalResult.result = new JsonPrimitive(val); + break; + } + } + } + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutorFactory.java b/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutorFactory.java new file mode 100644 index 0000000..b611082 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutorFactory.java @@ -0,0 +1,21 @@ +package org.bdware.consistency.plugin.sharding; + +import org.bdware.sc.bean.ContractExecType; +import org.bdware.sdk.consistency.api.ContractExecutorFactory; +import org.bdware.server.trustedmodel.ContractExecutor; + +import java.util.Map; + +public class MultiPointCooperationExecutorFactory implements ContractExecutorFactory { + @Override + public String getExecutorName() { + return "Sharding"; + } + + @Override + public ContractExecutor getInstance(Map args) { + int nodeSize = (int) args.get("nodeSize"); + String contractID = (String) args.get("contractID"); + return new MultiPointCooperationExecutor(ContractExecType.Sharding, nodeSize, contractID); + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutor.java b/src/main/java/org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutor.java new file mode 100644 index 0000000..3e61606 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutor.java @@ -0,0 +1,308 @@ +package org.bdware.consistency.plugin.sharding; + +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.consistency.plugin.common.AbstractContextContractExecutor; +import org.bdware.sc.ContractClient; +import org.bdware.sc.ContractManager; +import org.bdware.sc.ContractResult; +import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.bean.FunctionDesp; +import org.bdware.sc.bean.SM2Verifiable; +import org.bdware.sc.conn.OnHashCallback; +import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.units.MultiContractMeta; +import org.bdware.sc.units.RecoverFlag; +import org.bdware.sc.util.HashUtil; +import org.bdware.sc.util.JsonUtil; + +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +/** + * @author Kaidong Wu + */ +public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecutor { + private static final Logger LOGGER = LogManager.getLogger(SelfAdaptiveShardingExecutor.class); + private static final int SUBMIT_LIMIT = 1024; + private final Queue reqQueue = new ConcurrentLinkedQueue<>(); + private final MultiContractMeta meta; + private final Map toExecuted = new ConcurrentHashMap<>(); + private final Set executedBlocks = ConcurrentHashMap.newKeySet(); + private final Map executedTxs = new ConcurrentHashMap<>(); + private final Object flag = new Object(); + private final ScheduledFuture future; + private boolean running = true; + private Block b = new Block(); + + public SelfAdaptiveShardingExecutor(String contractID) { + this.meta = + cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID); + this.future = ContractManager.scheduledThreadPool.scheduleWithFixedDelay( + this::submitBlock, + 2, + 2, + TimeUnit.SECONDS); + LOGGER.debug(String.format("ContractManager.threadPool=%d/%d", + ((ThreadPoolExecutor) ContractManager.threadPool).getActiveCount(), + ((ThreadPoolExecutor) ContractManager.threadPool).getPoolSize())); + ContractManager.threadPool.execute(() -> { + LOGGER.info( + "[SelfAdaptiveShardingExecutor " + meta.getContractID() + "] starting service..." + running); + while (running) { + LOGGER.info("checking blocks to be executed, latest block=" + + this.b.prevHash + ", to be executed size=" + toExecuted.size()); + LOGGER.debug("executed: " + JsonUtil.toJson(executedBlocks) + "\n\t" + JsonUtil.toJson(executedTxs)); + while (!toExecuted.isEmpty()) { + String key = this.b.prevHash; + Block block = toExecuted.get(key); + if (null != block) { + executeBlock(block); + } + toExecuted.remove(key); + } + synchronized (flag) { + try { + flag.wait(); + } catch (InterruptedException e) { + LOGGER.warn(String.format( + "[SelfAdaptiveShardingExecutor %s] waiting is interrupted: %s", + meta.getContractID(), + e.getMessage())); + } + } + } + }); + } + + @Override + public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) { + // check client + ContractClient client = cmActions.getManager().getClient(meta.getContractID()); + if (null == client) { + LOGGER.error("contract " + meta.getContractID() + " not found!"); + rcb.onResult(JsonUtil.toJson(new ContractResult( + ContractResult.Status.Error, + new JsonPrimitive("contract " + meta.getContractID() + " not found!")))); + return; + } + // check function + FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction()); + if (null == funDesp) { + LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!"); + rcb.onResult(JsonUtil.toJson(new ContractResult( + ContractResult.Status.Error, + new JsonPrimitive( + String.format("action %s of contract %s not found!", + req.getAction(), + meta.getContractID()))))); + return; + } + // for view function, execute it + if (funDesp.isView) { + cmActions.getManager().executeLocallyAsync(req, rcb, hcb); + return; + } + // normal function, check if it is in blocks + if (executedTxs.containsKey(requestID)) { + rcb.onResult(JsonUtil.toJson(new ContractResult( + ContractResult.Status.Error, + new JsonPrimitive("this request has been packed!")))); + return; + } + // add blocks into request cache + LOGGER.debug("receive contract request " + requestID); + executedTxs.put(requestID, false); + reqQueue.add(req); + rcb.onResult(JsonUtil.toJson(new ContractResult( + ContractResult.Status.Executing, + new JsonPrimitive("this request is adding into blocks")))); + // if cache is full, submit + if (reqQueue.size() >= SUBMIT_LIMIT) { + ContractManager.threadPool.execute(this::submitBlock); + } + } + + @Override + public void close() { + // stop threads + this.future.cancel(true); + this.running = false; + LOGGER.info("destruct executor of contract " + meta.getContractID()); + } + + public void execute(String blockStr) { + Block block = JsonUtil.fromJson(blockStr, Block.class); + // the block must have not been cached or executed, and must be valid + if (!toExecuted.containsKey(block.prevHash) && + !executedBlocks.contains(block.hash) && + block.isValid()) { + // add block into block cache + LOGGER.info(String.format( + "[SelfAdaptiveShardingExecutor %s] receive block %s -> %s," + + " %d transactions, timestamp=%d, size=%d", + meta.getContractID(), + block.hash, + block.prevHash, + block.requests.length, + block.timestamp, + blockStr.length())); + toExecuted.put(block.prevHash, block); + // notify thread to execute blocks + synchronized (flag) { + flag.notify(); + } + } + } + + private synchronized void executeBlock(Block block) { + // used for the thread to execute blocks + LOGGER.debug("start"); + // check contract requests, requests must have not been executed + for (ContractRequest request : block.requests) { + if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) { + LOGGER.debug("find request " + request.getRequestID() + " has been executed!"); + return; + } + } + // TODO check status + // executed requests + for (ContractRequest request : block.requests) { + String ret = cmActions.getManager().executeLocally(request, null); + LOGGER.debug(String.format( + "[SelfAdaptiveShardingExecutor %s] result of request %s: %s", + meta.getContractID(), + request.getRequestID(), + ret)); + executedTxs.put(request.getRequestID(), true); + } + LOGGER.info(String.format( + "[SelfAdaptiveShardingExecutor %s] execute %d transactions of block %s", + meta.getContractID(), + block.requests.length, + block.hash)); + // TODO create check point + this.b = new Block(block.hash, this.b.height + 1); + executedBlocks.add(block.hash); + } + + private void submitBlock() { + Block block = fillBlock(); + if (null != block) { + LOGGER.info("deliver block " + block.hash + "..."); + LOGGER.debug(JsonUtil.toPrettyJson(block)); + String[] nodes = this.meta.getMembers(); + JsonObject req = new JsonObject(); + req.addProperty("action", "deliverBlock"); + req.addProperty("data", JsonUtil.toJson(block)); + req.addProperty("contractID", this.meta.getContractID()); + String reqStr = req.toString(); + // deliver blocks + for (String node : nodes) { + if (masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(this.meta.getContractID()) + == RecoverFlag.Fine) { + networkManager.sendToAgent(node, reqStr); + } + } + } + } + + private synchronized Block fillBlock() { + // pack contract requests into a block + ContractRequest[] requests = new ContractRequest[Math.min(reqQueue.size(), SUBMIT_LIMIT)]; + if (requests.length == 0) { + return null; + } + for (int i = 0; i < requests.length; ++i) { + requests[i] = reqQueue.poll(); + } + this.b.fillBlock(requests); + return this.b; + } + + static class Block extends SM2Verifiable { + String prevHash = "0"; + String hash; + int height; + String checkPoint; + String body; + String nodePubKey; + ContractRequest[] requests; + long timestamp; + + public Block() { + this.height = 0; + } + + public Block(String prev, int height) { + this.prevHash = prev; + this.height = height; + } + + public void fillBlock(ContractRequest[] requests) { + this.requests = requests; + this.timestamp = System.currentTimeMillis(); + this.body = merkle(requests); + this.hash = computeHash(); + doSignature(cmActions.getManager().nodeCenterConn.getNodeKeyPair()); + } + + public boolean isValid() { + return computeHash().equals(hash) && body.equals(merkle(this.requests)) && verifySignature(); + } + + private String computeHash() { + return HashUtil.sha3( + String.valueOf(this.height), + this.prevHash, + this.checkPoint, + this.body); + } + + private String merkle(ContractRequest[] requests) { + // manage requests as a merkle tree + if (requests.length == 0) { + return null; + } + if (requests.length == 1) { + return HashUtil.sha3(requests[0].getRequestID()); + } + Queue reqQueue = + Arrays.stream(requests).map(ContractRequest::getRequestID) + .collect(Collectors.toCollection(ArrayDeque::new)); + do { + int size; + for (size = reqQueue.size(); size > 1; size -= 2) { + reqQueue.add(HashUtil.sha3(reqQueue.poll(), reqQueue.poll())); + } + if (size == 1) { + reqQueue.add(reqQueue.poll()); + } + } while (1 != reqQueue.size()); + return reqQueue.poll(); + } + + @Override + public String getPublicKey() { + return nodePubKey; + } + + @Override + public void setPublicKey(String pubkey) { + this.nodePubKey = pubkey; + } + + @Override + public String getContentStr() { + return this.hash; + } + } + + @Override + public void onDeliverBlock(String data) { + execute(data); + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutorFactory.java b/src/main/java/org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutorFactory.java new file mode 100644 index 0000000..61e4154 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutorFactory.java @@ -0,0 +1,19 @@ +package org.bdware.consistency.plugin.sharding; + +import org.bdware.sdk.consistency.api.ContractExecutorFactory; +import org.bdware.server.trustedmodel.ContractExecutor; + +import java.util.Map; + +public class SelfAdaptiveShardingExecutorFactory implements ContractExecutorFactory { + @Override + public String getExecutorName() { + return "SASharding"; + } + + @Override + public ContractExecutor getInstance(Map args) { + String contractID = (String) args.get("contractID"); + return new SelfAdaptiveShardingExecutor(contractID); + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/single/SingleNodeExecutorFactory.java b/src/main/java/org/bdware/consistency/plugin/single/SingleNodeExecutorFactory.java new file mode 100644 index 0000000..0eaa660 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/single/SingleNodeExecutorFactory.java @@ -0,0 +1,24 @@ +package org.bdware.consistency.plugin.single; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.sdk.consistency.api.ContractExecutorFactory; +import org.bdware.server.trustedmodel.ContractExecutor; +import org.bdware.server.trustedmodel.SingleNodeExecutor; + +import java.util.Map; + +public class SingleNodeExecutorFactory implements ContractExecutorFactory { + private static final Logger LOGGER = LogManager.getLogger(SingleNodeExecutorFactory.class); + + @Override + public String getExecutorName() { + return "Sole"; + } + + @Override + public ContractExecutor getInstance(Map args) { + LOGGER.info("Sole contract is not supported in multi-point mode"); + return SingleNodeExecutor.instance; + } +}