diff --git a/build.gradle b/build.gradle index 99b4753..0bc7378 100644 --- a/build.gradle +++ b/build.gradle @@ -2,6 +2,8 @@ plugins { id 'java' } +apply from: '../spotless.gradle' + dependencies { implementation project(":consistency-sdk") diff --git a/src/main/java/org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java b/src/main/java/org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java index 1785741..1e28915 100644 --- a/src/main/java/org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java +++ b/src/main/java/org/bdware/consistency/plugin/common/AbstractContextContractExecutor.java @@ -7,8 +7,12 @@ import org.bdware.server.trustedmodel.ContractExecutor; public abstract class AbstractContextContractExecutor implements ContractExecutor { static protected IGlobalConf globalConf = ConsistencyPluginManager.getContext().getGlobalConf(); static protected ICMActions cmActions = ConsistencyPluginManager.getContext().getCMActions(); - static protected INetworkManager networkManager = ConsistencyPluginManager.getContext().getNetworkManager(); - static protected IMasterClientTCPAction masterClientTCPAction = ConsistencyPluginManager.getContext().getMasterClientTCPAction(); - static protected IMasterServerTCPAction masterServerTCPAction = ConsistencyPluginManager.getContext().getMasterServerTCPAction(); - static protected IMasterServerRecoverMechAction masterServerRecoverMechAction = ConsistencyPluginManager.getContext().getMasterServerRecoverMechAction(); + static protected INetworkManager networkManager = + ConsistencyPluginManager.getContext().getNetworkManager(); + static protected IMasterClientTCPAction masterClientTCPAction = + ConsistencyPluginManager.getContext().getMasterClientTCPAction(); + static protected IMasterServerTCPAction masterServerTCPAction = + ConsistencyPluginManager.getContext().getMasterServerTCPAction(); + static protected IMasterServerRecoverMechAction masterServerRecoverMechAction = + ConsistencyPluginManager.getContext().getMasterServerRecoverMechAction(); } diff --git a/src/main/java/org/bdware/consistency/plugin/pbft/ContractCluster.java b/src/main/java/org/bdware/consistency/plugin/pbft/ContractCluster.java index 48fdada..a848774 100644 --- a/src/main/java/org/bdware/consistency/plugin/pbft/ContractCluster.java +++ b/src/main/java/org/bdware/consistency/plugin/pbft/ContractCluster.java @@ -25,7 +25,8 @@ public class ContractCluster implements TrustfulExecutorConnection { jo.addProperty("action", "contractSyncMessage"); jo.addProperty("contractID", contractID); jo.addProperty("data", ByteUtil.encodeBASE64(msg)); - ConsistencyPluginManager.getInstance().getContext().getNetworkManager().sendToAgent(node.pubkey, jo.toString()); + ConsistencyPluginManager.getInstance().getContext().getNetworkManager() + .sendToAgent(node.pubkey, jo.toString()); } @Override diff --git a/src/main/java/org/bdware/consistency/plugin/pbft/PBFTExecutor.java b/src/main/java/org/bdware/consistency/plugin/pbft/PBFTExecutor.java index 48a4d0d..7eaec6a 100644 --- a/src/main/java/org/bdware/consistency/plugin/pbft/PBFTExecutor.java +++ b/src/main/java/org/bdware/consistency/plugin/pbft/PBFTExecutor.java @@ -26,7 +26,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -//TODO 追赶差下的调用 +// TODO 追赶差下的调用 public class PBFTExecutor extends AbstractContextContractExecutor { private static final Logger LOGGER = LogManager.getLogger(PBFTExecutor.class); final Object lock = new Object(); @@ -37,14 +37,13 @@ public class PBFTExecutor extends AbstractContextContractExecutor { // key为requestID,value为其seq Map seqMap = new ConcurrentHashMap<>(); Map resultCache = new ConcurrentHashMap<>(); - // MultiPointContractInfo info; + // MultiPointContractInfo info; String contractID; PBFTAlgorithm pbft; ContractCluster contractCluster; boolean isMaster; - public PBFTExecutor( - int c, String con_id, final String masterPubkey, String[] members) { + public PBFTExecutor(int c, String con_id, final String masterPubkey, String[] members) { resultCount = c; contractID = con_id; this.members = new ArrayList<>(); @@ -65,7 +64,8 @@ public class PBFTExecutor extends AbstractContextContractExecutor { } contractCluster = new ContractCluster(contractID, this.members); pbft.setConnection(contractCluster); - final MultiContractMeta cei = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID); + final MultiContractMeta cei = + cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID); pbft.setCommitter(new Committer() { @Override public void onCommit(ContractRequest data) { @@ -100,23 +100,20 @@ public class PBFTExecutor extends AbstractContextContractExecutor { pbft.setAtomSeq(request_index.get()); } - public ResultCallback createResultCallback( - final String requestID, - final ResultCallback originalCb, - final int count, - final int request_seq, + public ResultCallback createResultCallback(final String requestID, + final ResultCallback originalCb, final int count, final int request_seq, final String contractID) { ComponedContractResult componedContractResult = new ComponedContractResult(count); // TODO 加对应的超时? - return networkManager.createResultCallback( - requestID, new ResultMerger(originalCb, count, request_seq, contractID), count); + return networkManager.createResultCallback(requestID, + new ResultMerger(originalCb, count, request_seq, contractID), count); } public void sendRequest(String id, ContractRequest req, ResultCallback collector) { -// Map reqStr = new HashMap<>(); -// reqStr.put("uniReqID", id); -// reqStr.put("data", req); -// reqStr.put("action", "executeContractLocally"); + // Map reqStr = new HashMap<>(); + // reqStr.put("uniReqID", id); + // reqStr.put("data", req); + // reqStr.put("action", "executeContractLocally"); ContractRequest cr2 = ContractRequest.parse(req.toByte()); cr2.setRequestID(id); PBFTMessage request = new PBFTMessage(); @@ -127,20 +124,17 @@ public class PBFTExecutor extends AbstractContextContractExecutor { if (!networkManager.hasAgentConnection(node.pubkey)) { LOGGER.warn("cmNode " + node.pubkey.substring(0, 5) + " is null"); collector.onResult( - "{\"status\":\"Error\",\"result\":\"node offline\"," - + "\"nodeID\":\"" - + node - + "\"," - + "\"action\":\"onExecuteContractTrustfully\"}"); -// } else if (MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID) -// != RecoverFlag.Fine) { -// collector.onResult( -// "{\"status\":\"Error\",\"result\":\"node recovering\"," -// + "\"nodeID\":\"" -// + node -// + "\"," -// + "\"action\":\"onExecuteContractTrustfully\"}"); -// contractCluster.sendMessage(node, request.getBytes()); + "{\"status\":\"Error\",\"result\":\"node offline\"," + "\"nodeID\":\"" + + node + "\"," + "\"action\":\"onExecuteContractTrustfully\"}"); + // } else if (MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID) + // != RecoverFlag.Fine) { + // collector.onResult( + // "{\"status\":\"Error\",\"result\":\"node recovering\"," + // + "\"nodeID\":\"" + // + node + // + "\"," + // + "\"action\":\"onExecuteContractTrustfully\"}"); + // contractCluster.sendMessage(node, request.getBytes()); } else { contractCluster.sendMessage(node, request.getBytes()); } @@ -150,8 +144,8 @@ public class PBFTExecutor extends AbstractContextContractExecutor { masterServerTCPAction.getReqCache().put(contractID, new RequestCache()); } // TODO 多调多统一个seq的有多个请求,这个需要改 - String[] nodes = - cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers(); + String[] nodes = cmActions.getManager().multiContractRecorder + .getMultiContractMeta(contractID).getMembers(); LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID()); LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes)); @@ -163,9 +157,11 @@ public class PBFTExecutor extends AbstractContextContractExecutor { } @Override - public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { + public void execute(String requestID, ContractRequest req, ResultCallback rc, + OnHashCallback hcb) { LOGGER.debug(JsonUtil.toJson(req)); - MultiContractMeta meta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(req.getContractID()); + MultiContractMeta meta = cmActions.getManager().multiContractRecorder + .getMultiContractMeta(req.getContractID()); if (meta == null || !meta.isMaster()) { cmActions.getManager().executeContractOnOtherNodes(req, rc); return; @@ -176,9 +172,9 @@ public class PBFTExecutor extends AbstractContextContractExecutor { // 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 // 如果是多点合约的请求,A1、A2、A3的序号应该一致,不能分配一个新的seq,根据requestID判断是否不需要重新分配一个序号 - //TODO seqMap memory leak - //TODO - //TODO + // TODO seqMap memory leak + // TODO + // TODO if (null != requestID && requestID.endsWith("_mul")) { synchronized (lock) { if (seqMap.containsKey(requestID)) { @@ -206,37 +202,32 @@ public class PBFTExecutor extends AbstractContextContractExecutor { } else { LOGGER.debug("invalidNodeNumOnResult"); request_index.getAndDecrement(); - ContractResult finalResult = - new ContractResult( - ContractResult.Status.Error, - new JsonPrimitive("node number unavailable, request refused.")); + ContractResult finalResult = new ContractResult(ContractResult.Status.Error, + new JsonPrimitive("node number unavailable, request refused.")); rc.onResult(JsonUtil.toJson(finalResult)); } // } - /* // 三个相同requestID进来的时候,会有冲突。 - // 仅在此处有冲突么? - // 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 - req.seq = request_index.getAndIncrement(); - req.needSeq = true; - ResultCallback collector = createResultCallback(id, rc, resultCount, req.getContractID()); - MasterServerTCPAction.sync.sleep(id, collector); - sendRequest(id, req, collector);*/ + /* + * // 三个相同requestID进来的时候,会有冲突。 // 仅在此处有冲突么? // + * 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 req.seq = + * request_index.getAndIncrement(); req.needSeq = true; ResultCallback collector = + * createResultCallback(id, rc, resultCount, req.getContractID()); + * MasterServerTCPAction.sync.sleep(id, collector); sendRequest(id, req, collector); + */ } // 清理缓存的多点合约请求序号 public void clearCache() { final long time = System.currentTimeMillis() - 30000L; - seqMap.entrySet() - .removeIf( - entry -> { - MultiReqSeq cache = entry.getValue(); - if (null == cache) { - return true; - } - return cache.startTime < time; - }); + seqMap.entrySet().removeIf(entry -> { + MultiReqSeq cache = entry.getValue(); + if (null == cache) { + return true; + } + return cache.startTime < time; + }); } public static class ResultMerger extends ResultCallback { @@ -248,10 +239,7 @@ public class PBFTExecutor extends AbstractContextContractExecutor { ResultCallback originalCallback; Set nodeIDs = new HashSet<>(); // 已收到返回结果的节点 - ResultMerger( - final ResultCallback originalCb, - final int count, - final int request_seq, + ResultMerger(final ResultCallback originalCb, final int count, final int request_seq, final String contractID) { originalCallback = originalCb; this.count = count; @@ -266,16 +254,8 @@ public class PBFTExecutor extends AbstractContextContractExecutor { } public String getInfo() { - return "contractID=" - + contractID - + " 收到第 " - + order - + " 个节点回复 : " - + " order=" - + order - + " count=" - + count - + " "; + return "contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + " order=" + order + + " count=" + count + " "; } @Override @@ -289,19 +269,15 @@ public class PBFTExecutor extends AbstractContextContractExecutor { if (obj.has("nodeID")) { String id = obj.get("nodeID").getAsString(); if (nodeIDs.contains(id)) { - LOGGER.debug( - "ignored result because the result of node " - + id.substring(0, 5) - + " has been received"); + LOGGER.debug("ignored result because the result of node " + + id.substring(0, 5) + " has been received"); return; } nodeIDs.add(id); } - LOGGER.debug( - String.format( - "contractID=%s received=%s order=%d count=%d", - contractID, str, order.get(), count)); + LOGGER.debug(String.format("contractID=%s received=%s order=%d count=%d", + contractID, str, order.get(), count)); componedContractResult.add(obj); // 收集到所有结果 if (order.incrementAndGet() == count) { @@ -309,26 +285,23 @@ public class PBFTExecutor extends AbstractContextContractExecutor { finalResult.needSeq = true; finalResult.seq = request_seq; - // if (null == finalResult) { - // finalResult = - // new ContractResult( - // ContractResult.Status.Exception, - // new JsonPrimitive( - // "no nore than half of the + // if (null == finalResult) { + // finalResult = + // new ContractResult( + // ContractResult.Status.Exception, + // new JsonPrimitive( + // "no nore than half of the // consistent result")); - // originalCallback.onResult(new + // originalCallback.onResult(new // Gson().toJson(finalResult)); - // } else { + // } else { originalCallback.onResult(JsonUtil.toJson(finalResult)); - // } - LOGGER.debug( - String.format( - "%d results are the same: %s", - finalResult.size, finalResult.result)); + // } + LOGGER.debug(String.format("%d results are the same: %s", finalResult.size, + finalResult.result)); // 集群中事务序号+1 - cmActions.getManager().multiContractRecorder - .getMultiContractMeta(contractID) + cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID) .nextSeqAtMaster(); // recover,其中无状态合约CP出错无需恢复 @@ -338,26 +311,25 @@ public class PBFTExecutor extends AbstractContextContractExecutor { } for (String nodeID : nodesID) { LOGGER.warn("node fails! " + nodeID); - if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) - == RecoverFlag.Fine) { - masterServerRecoverMechAction.getRecoverStatusMap() - .get(nodeID) + if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID) + .get(contractID) == RecoverFlag.Fine) { + masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID) .put(contractID, RecoverFlag.ToRecover); } } for (String nodeID : nodesID) { - if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) - == RecoverFlag.ToRecover) { + if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID) + .get(contractID) == RecoverFlag.ToRecover) { LOGGER.warn("node in recover " + nodeID); // 因为该节点结果有误,所以即时是stableMode也认为trans记录不可信 // 直接通过load别的节点来恢复 - masterServerRecoverMechAction.restartContractFromCommonMode( - nodeID, contractID); + masterServerRecoverMechAction.restartContractFromCommonMode(nodeID, + contractID); } } } - // clearCache(); + // clearCache(); } catch (Exception e) { e.printStackTrace(); LOGGER.warn("result exception!"); diff --git a/src/main/java/org/bdware/consistency/plugin/ra/RequestAllExecutor.java b/src/main/java/org/bdware/consistency/plugin/ra/RequestAllExecutor.java index 2aabfd4..880418b 100644 --- a/src/main/java/org/bdware/consistency/plugin/ra/RequestAllExecutor.java +++ b/src/main/java/org/bdware/consistency/plugin/ra/RequestAllExecutor.java @@ -36,11 +36,10 @@ public class RequestAllExecutor extends AbstractContextContractExecutor { // key为requestID,value为其seq Map seqMap = new ConcurrentHashMap<>(); Map resultCache = new ConcurrentHashMap<>(); - // MultiPointContractInfo info; + // MultiPointContractInfo info; String contractID; - public RequestAllExecutor( - ContractExecType t, int c, String con_id) { + public RequestAllExecutor(ContractExecType t, int c, String con_id) { type = t; resultCount = c; contractID = con_id; @@ -50,16 +49,13 @@ public class RequestAllExecutor extends AbstractContextContractExecutor { request_index = new AtomicInteger(seq); } - public ResultCallback createResultCallback( - final String requestID, - final ResultCallback originalCb, - final int count, - final int request_seq, + public ResultCallback createResultCallback(final String requestID, + final ResultCallback originalCb, final int count, final int request_seq, final String contractID) { ComponedContractResult componedContractResult = new ComponedContractResult(count); // TODO 加对应的超时? - return networkManager.createResultCallback( - requestID, new ResultMerger(originalCb, count, request_seq, contractID), count); + return networkManager.createResultCallback(requestID, + new ResultMerger(originalCb, count, request_seq, contractID), count); } public void sendRequest(String id, ContractRequest req, ResultCallback collector) { @@ -78,8 +74,8 @@ public class RequestAllExecutor extends AbstractContextContractExecutor { LOGGER.debug(JsonUtil.toJson(req)); - String[] nodes = - cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers(); + String[] nodes = cmActions.getManager().multiContractRecorder + .getMultiContractMeta(contractID).getMembers(); LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID()); LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes)); for (String node : nodes) { @@ -87,19 +83,13 @@ public class RequestAllExecutor extends AbstractContextContractExecutor { if (!networkManager.hasAgentConnection(node)) { LOGGER.warn("cmNode " + node.substring(0, 5) + " is null"); collector.onResult( - "{\"status\":\"Error\",\"result\":\"node offline\"," - + "\"nodeID\":\"" - + node - + "\"," - + "\"action\":\"onExecuteContractTrustfully\"}"); - } else if (masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID) - != RecoverFlag.Fine) { + "{\"status\":\"Error\",\"result\":\"node offline\"," + "\"nodeID\":\"" + + node + "\"," + "\"action\":\"onExecuteContractTrustfully\"}"); + } else if (masterServerRecoverMechAction.getRecoverStatusMap().get(node) + .get(contractID) != RecoverFlag.Fine) { collector.onResult( - "{\"status\":\"Error\",\"result\":\"node recovering\"," - + "\"nodeID\":\"" - + node - + "\"," - + "\"action\":\"onExecuteContractTrustfully\"}"); + "{\"status\":\"Error\",\"result\":\"node recovering\"," + "\"nodeID\":\"" + + node + "\"," + "\"action\":\"onExecuteContractTrustfully\"}"); networkManager.sendToAgent(node, sendStr); } else { LOGGER.info("send request to cmNode " + node.substring(0, 5)); @@ -109,16 +99,16 @@ public class RequestAllExecutor extends AbstractContextContractExecutor { } public boolean checkCurNodeNumValid() { - String[] nodes = - cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers(); + String[] nodes = cmActions.getManager().multiContractRecorder + .getMultiContractMeta(contractID).getMembers(); int validNode = 0; Map mapResult = new HashMap<>(); for (String node : nodes) { - mapResult.put(node.substring(0, 5), String.format("%s %s", networkManager.hasAgentConnection(node) + "", + mapResult.put(node.substring(0, 5), String.format("%s %s", + networkManager.hasAgentConnection(node) + "", masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID))); - if (networkManager.hasAgentConnection(node) - && masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID) - == RecoverFlag.Fine) { + if (networkManager.hasAgentConnection(node) && masterServerRecoverMechAction + .getRecoverStatusMap().get(node).get(contractID) == RecoverFlag.Fine) { validNode++; } } @@ -132,9 +122,11 @@ public class RequestAllExecutor extends AbstractContextContractExecutor { } @Override - public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { + public void execute(String requestID, ContractRequest req, ResultCallback rc, + OnHashCallback hcb) { LOGGER.debug(JsonUtil.toJson(req)); - MultiContractMeta meta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(req.getContractID()); + MultiContractMeta meta = cmActions.getManager().multiContractRecorder + .getMultiContractMeta(req.getContractID()); if (meta == null || !meta.isMaster()) { cmActions.getManager().executeContractOnOtherNodes(req, rc); return; @@ -146,9 +138,9 @@ public class RequestAllExecutor extends AbstractContextContractExecutor { // 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 // 如果是多点合约的请求,A1、A2、A3的序号应该一致,不能分配一个新的seq,根据requestID判断是否不需要重新分配一个序号 - //TODO seqMap memory leak - //TODO - //TODO + // TODO seqMap memory leak + // TODO + // TODO if (null != requestID && requestID.endsWith("_mul")) { synchronized (lock) { if (seqMap.containsKey(requestID)) { @@ -176,37 +168,32 @@ public class RequestAllExecutor extends AbstractContextContractExecutor { } else { LOGGER.debug("invalidNodeNumOnResult"); request_index.getAndDecrement(); - ContractResult finalResult = - new ContractResult( - ContractResult.Status.Error, - new JsonPrimitive("node number unavailable, request refused.")); + ContractResult finalResult = new ContractResult(ContractResult.Status.Error, + new JsonPrimitive("node number unavailable, request refused.")); rc.onResult(JsonUtil.toJson(finalResult)); } // } - /* // 三个相同requestID进来的时候,会有冲突。 - // 仅在此处有冲突么? - // 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 - req.seq = request_index.getAndIncrement(); - req.needSeq = true; - ResultCallback collector = createResultCallback(id, rc, resultCount, req.getContractID()); - MasterServerTCPAction.sync.sleep(id, collector); - sendRequest(id, req, collector);*/ + /* + * // 三个相同requestID进来的时候,会有冲突。 // 仅在此处有冲突么? // + * 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 req.seq = + * request_index.getAndIncrement(); req.needSeq = true; ResultCallback collector = + * createResultCallback(id, rc, resultCount, req.getContractID()); + * MasterServerTCPAction.sync.sleep(id, collector); sendRequest(id, req, collector); + */ } // 清理缓存的多点合约请求序号 public void clearCache() { final long time = System.currentTimeMillis() - 30000L; - seqMap.entrySet() - .removeIf( - entry -> { - MultiReqSeq cache = entry.getValue(); - if (null == cache) { - return true; - } - return cache.startTime < time; - }); + seqMap.entrySet().removeIf(entry -> { + MultiReqSeq cache = entry.getValue(); + if (null == cache) { + return true; + } + return cache.startTime < time; + }); } public static class ResultMerger extends ResultCallback implements NotifiableResultMerger { @@ -218,10 +205,7 @@ public class RequestAllExecutor extends AbstractContextContractExecutor { ResultCallback originalCallback; Set nodeIDs = new HashSet<>(); // 已收到返回结果的节点 - public ResultMerger( - final ResultCallback originalCb, - final int count, - final int request_seq, + public ResultMerger(final ResultCallback originalCb, final int count, final int request_seq, final String contractID) { originalCallback = originalCb; this.count = count; @@ -236,16 +220,8 @@ public class RequestAllExecutor extends AbstractContextContractExecutor { } public String getInfo() { - return "contractID=" - + contractID - + " 收到第 " - + order - + " 个节点回复 : " - + " order=" - + order - + " count=" - + count - + " "; + return "contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + " order=" + order + + " count=" + count + " "; } @Override @@ -259,19 +235,15 @@ public class RequestAllExecutor extends AbstractContextContractExecutor { if (obj.has("nodeID")) { String id = obj.get("nodeID").getAsString(); if (nodeIDs.contains(id)) { - LOGGER.debug( - "ignored result because the result of node " - + id.substring(0, 5) - + " has been received"); + LOGGER.debug("ignored result because the result of node " + + id.substring(0, 5) + " has been received"); return; } nodeIDs.add(id); } - LOGGER.debug( - String.format( - "contractID=%s received=%s order=%d count=%d", - contractID, str, order.get(), count)); + LOGGER.debug(String.format("contractID=%s received=%s order=%d count=%d", + contractID, str, order.get(), count)); componedContractResult.add(obj); // 收集到所有结果 if (order.incrementAndGet() == count) { @@ -279,26 +251,23 @@ public class RequestAllExecutor extends AbstractContextContractExecutor { finalResult.needSeq = true; finalResult.seq = request_seq; - // if (null == finalResult) { - // finalResult = - // new ContractResult( - // ContractResult.Status.Exception, - // new JsonPrimitive( - // "no nore than half of the + // if (null == finalResult) { + // finalResult = + // new ContractResult( + // ContractResult.Status.Exception, + // new JsonPrimitive( + // "no nore than half of the // consistent result")); - // originalCallback.onResult(new + // originalCallback.onResult(new // Gson().toJson(finalResult)); - // } else { + // } else { originalCallback.onResult(JsonUtil.toJson(finalResult)); - // } - LOGGER.debug( - String.format( - "%d results are the same: %s", - finalResult.size, finalResult.result)); + // } + LOGGER.debug(String.format("%d results are the same: %s", finalResult.size, + finalResult.result)); // 集群中事务序号+1 - cmActions.getManager().multiContractRecorder - .getMultiContractMeta(contractID) + cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID) .nextSeqAtMaster(); // recover,其中无状态合约CP出错无需恢复 @@ -308,26 +277,25 @@ public class RequestAllExecutor extends AbstractContextContractExecutor { } for (String nodeID : nodesID) { LOGGER.warn("node fails! " + nodeID); - if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) - == RecoverFlag.Fine) { - masterServerRecoverMechAction.getRecoverStatusMap() - .get(nodeID) + if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID) + .get(contractID) == RecoverFlag.Fine) { + masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID) .put(contractID, RecoverFlag.ToRecover); } } for (String nodeID : nodesID) { - if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) - == RecoverFlag.ToRecover) { + if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID) + .get(contractID) == RecoverFlag.ToRecover) { LOGGER.warn("node in recover " + nodeID); // 因为该节点结果有误,所以即时是stableMode也认为trans记录不可信 // 直接通过load别的节点来恢复 - masterServerRecoverMechAction.restartContractFromCommonMode( - nodeID, contractID); + masterServerRecoverMechAction.restartContractFromCommonMode(nodeID, + contractID); } } } - // clearCache(); + // clearCache(); } catch (Exception e) { e.printStackTrace(); LOGGER.warn("result exception!"); diff --git a/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseFirstFactory.java b/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseFirstFactory.java index 2861757..0248444 100644 --- a/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseFirstFactory.java +++ b/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseFirstFactory.java @@ -15,7 +15,6 @@ public class RequestAllResponseFirstFactory implements ContractExecutorFactory { @Override public ContractExecutor getInstance(Map args) { String contractID = (String) args.get("contractID"); - return new RequestAllExecutor( - ContractExecType.RequestAllResponseFirst, 1, contractID); + return new RequestAllExecutor(ContractExecType.RequestAllResponseFirst, 1, contractID); } } diff --git a/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseHalfFactory.java b/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseHalfFactory.java index 56c8210..61a0088 100644 --- a/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseHalfFactory.java +++ b/src/main/java/org/bdware/consistency/plugin/ra/RequestAllResponseHalfFactory.java @@ -16,9 +16,7 @@ public class RequestAllResponseHalfFactory implements ContractExecutorFactory { public ContractExecutor getInstance(Map args) { int nodeSize = (int) args.get("nodeSize"); String contractID = (String) args.get("contractID"); - return new RequestAllExecutor( - ContractExecType.RequestAllResponseHalf, - nodeSize / 2 + 1, + return new RequestAllExecutor(ContractExecType.RequestAllResponseHalf, nodeSize / 2 + 1, contractID); } } diff --git a/src/main/java/org/bdware/consistency/plugin/raft/RAFTExecutor.java b/src/main/java/org/bdware/consistency/plugin/raft/RAFTExecutor.java index c3f8032..af1da0a 100644 --- a/src/main/java/org/bdware/consistency/plugin/raft/RAFTExecutor.java +++ b/src/main/java/org/bdware/consistency/plugin/raft/RAFTExecutor.java @@ -39,7 +39,8 @@ public class RAFTExecutor extends AbstractContextContractExecutor { members.add(pubkeyNode); } contractCluster = new ContractCluster(contractID, members); - final MultiContractMeta cei = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID); + final MultiContractMeta cei = + cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID); raft = new RaftAlgorithm(config, selfNode, contractCluster, new Committer() { @Override public void onCommit(ContractRequest request) { @@ -63,7 +64,7 @@ public class RAFTExecutor extends AbstractContextContractExecutor { } }); - //TODO 在改变角色时需要通知node cluster + // TODO 在改变角色时需要通知node cluster if (cei.isMaster()) { raft.convertToLeader(); } else { @@ -72,9 +73,11 @@ public class RAFTExecutor extends AbstractContextContractExecutor { } @Override - public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) { + public void execute(String requestID, ContractRequest req, ResultCallback rcb, + OnHashCallback hcb) { LOGGER.debug(JsonUtil.toJson(req)); - MultiContractMeta multiContractMeta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(req.getContractID()); + MultiContractMeta multiContractMeta = cmActions.getManager().multiContractRecorder + .getMultiContractMeta(req.getContractID()); if (multiContractMeta == null || !multiContractMeta.isMaster()) { cmActions.getManager().executeContractOnOtherNodes(req, rcb); return; @@ -82,8 +85,9 @@ public class RAFTExecutor extends AbstractContextContractExecutor { ResultCallback collector; int count = contractCluster.getNodes().size(); - collector = networkManager.createResultCallback( - requestID, new RequestAllExecutor.ResultMerger(rcb, count, req.seq, req.getContractID()), count); + collector = networkManager.createResultCallback(requestID, + new RequestAllExecutor.ResultMerger(rcb, count, req.seq, req.getContractID()), + count); masterServerTCPAction.getSync().sleep(requestID, collector); raft.insertLogEntry(req); } diff --git a/src/main/java/org/bdware/consistency/plugin/raft/RAFTExecutorFactory.java b/src/main/java/org/bdware/consistency/plugin/raft/RAFTExecutorFactory.java index 7b9a37d..2d1f659 100644 --- a/src/main/java/org/bdware/consistency/plugin/raft/RAFTExecutorFactory.java +++ b/src/main/java/org/bdware/consistency/plugin/raft/RAFTExecutorFactory.java @@ -1,6 +1,5 @@ package org.bdware.consistency.plugin.raft; -import org.bdware.consistency.plugin.pbft.PBFTExecutor; import org.bdware.sdk.consistency.api.ContractExecutorFactory; import org.bdware.server.trustedmodel.ContractExecutor; @@ -14,9 +13,9 @@ public class RAFTExecutorFactory implements ContractExecutorFactory { @Override public ContractExecutor getInstance(Map args) { -// int nodeSize = (int) args.get("nodeSize"); + // int nodeSize = (int) args.get("nodeSize"); String contractID = (String) args.get("contractID"); String[] memberPubKeys = (String[]) args.get("members"); return new RAFTExecutor(contractID, memberPubKeys); } -} \ No newline at end of file +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/RaftAlgorithm.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/RaftAlgorithm.java index 65a6564..65e60e5 100644 --- a/src/main/java/org/bdware/consistency/plugin/raft/algo/RaftAlgorithm.java +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/RaftAlgorithm.java @@ -19,11 +19,14 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class RaftAlgorithm implements CommitAlgorithm { - private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(RaftAlgorithm.class); + private static final Logger LOGGER = + org.apache.logging.log4j.LogManager.getLogger(RaftAlgorithm.class); private ContractCluster connection; Committer committer; - private enum State {LEADER, FOLLOWER, CANDIDATE} + private enum State { + LEADER, FOLLOWER, CANDIDATE + } State state; PubKeyNode leader; @@ -52,14 +55,15 @@ public class RaftAlgorithm implements CommitAlgorithm { TimerTask heartbeatTimerTask; long lastActivated; - //config + // config RaftConfig config; // Locks and Conditions private Lock lock = new ReentrantLock(); - public RaftAlgorithm(RaftConfig config, PubKeyNode node, ContractCluster conn, Committer committer) { + public RaftAlgorithm(RaftConfig config, PubKeyNode node, ContractCluster conn, + Committer committer) { this.config = config; self = node; connection = conn; @@ -117,7 +121,8 @@ public class RaftAlgorithm implements CommitAlgorithm { case RequestVote: response = onRequestVote((RequestVote) message); persist(); - connection.sendMessage((PubKeyNode) sender, new RaftMessageParser(response).getBytes()); + connection.sendMessage((PubKeyNode) sender, + new RaftMessageParser(response).getBytes()); break; case RequestVoteResponse: onRequestVoteResponse((RequestVoteResponse) message); @@ -125,7 +130,8 @@ public class RaftAlgorithm implements CommitAlgorithm { case AppendEntries: response = onAppendEntries((AppendEntries) message); persist(); - connection.sendMessage((PubKeyNode) sender, new RaftMessageParser(response).getBytes()); + connection.sendMessage((PubKeyNode) sender, + new RaftMessageParser(response).getBytes()); break; case AppendEntriesResponse: onAppendEntriesResponse((PubKeyNode) sender, (AppendEntriesResponse) message); @@ -143,14 +149,16 @@ public class RaftAlgorithm implements CommitAlgorithm { // in lock private RequestVoteResponse onRequestVote(RequestVote request) { if (request.getTerm() < currentTerm) { - LOGGER.debug("Rejected RequestVote message in term {}" + - ", because the message is stale. My term is {}.", request.getTerm(), currentTerm); + LOGGER.debug( + "Rejected RequestVote message in term {}" + + ", because the message is stale. My term is {}.", + request.getTerm(), currentTerm); return new RequestVoteResponse(currentTerm, false); } if (request.getTerm() == currentTerm) { if (votedFor != null && !votedFor.pubkey.equals(request.getCandidate().pubkey)) { - LOGGER.debug("Rejected RequestVote message in term {}" + - ", because I have voted for another server. ", request.getTerm()); + LOGGER.debug("Rejected RequestVote message in term {}" + + ", because I have voted for another server. ", request.getTerm()); return new RequestVoteResponse(currentTerm, false); } } else { @@ -179,7 +187,8 @@ public class RaftAlgorithm implements CommitAlgorithm { leader = null; votedFor = null; convertToFollower(); - } else if (state == State.CANDIDATE && response.getTerm() == currentTerm && response.isGranted()) { + } else if (state == State.CANDIDATE && response.getTerm() == currentTerm + && response.isGranted()) { ++grantedVoteCount; if (grantedVoteCount > servers.size() / 2) { convertToLeader(); @@ -192,8 +201,10 @@ public class RaftAlgorithm implements CommitAlgorithm { private AppendEntriesResponse onAppendEntries(AppendEntries request) { LogEntry lastEntry = logManager.getLastEntry(); if (request.getTerm() < currentTerm) { - LOGGER.debug("Rejected AppendEntries message in term {}" + - ", because the message is stale. My term is {}.", request.getTerm(), currentTerm); + LOGGER.debug( + "Rejected AppendEntries message in term {}" + + ", because the message is stale. My term is {}.", + request.getTerm(), currentTerm); return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), false, self); } if (request.getTerm() > currentTerm) { @@ -207,8 +218,9 @@ public class RaftAlgorithm implements CommitAlgorithm { LOGGER.info("new leader={}", leader.pubkey); } if (!leader.pubkey.equals(request.getLeader().pubkey)) { - LOGGER.warn("Another peer={} declares that it is the leader " + - "at term={} which was occupied by leader={}", + LOGGER.warn( + "Another peer={} declares that it is the leader " + + "at term={} which was occupied by leader={}", request.getLeader().pubkey, request.getTerm(), leader.pubkey); return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), false, self); } @@ -218,7 +230,8 @@ public class RaftAlgorithm implements CommitAlgorithm { if (entry == null) { return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), false, self); } - if (entry.getTerm() != request.getTerm() || logManager.getLastEntryIndex() > request.getPrevLogIndex()) { + if (entry.getTerm() != request.getTerm() + || logManager.getLastEntryIndex() > request.getPrevLogIndex()) { logManager.deleteEntriesStartAt(request.getPrevLogIndex()); lastEntry = logManager.getLastEntry(); } @@ -229,9 +242,11 @@ public class RaftAlgorithm implements CommitAlgorithm { logManager.append(newEntries); lastEntry = logManager.getLastEntry(); if (request.getLeaderCommit() > commitIndex) { - commitIndex = Math.min(request.getLeaderCommit(), newEntries.get(newEntries.size() - 1).getIndex()); + commitIndex = Math.min(request.getLeaderCommit(), + newEntries.get(newEntries.size() - 1).getIndex()); } - LOGGER.info("RECEIVE APPENDENTRY: commitIndex=" + commitIndex + " lastApplied:" + lastApplied); + LOGGER.info( + "RECEIVE APPENDENTRY: commitIndex=" + commitIndex + " lastApplied:" + lastApplied); while (commitIndex > lastApplied) { applyToStateMachine(lastApplied + 1); lastApplied++; @@ -248,9 +263,11 @@ public class RaftAlgorithm implements CommitAlgorithm { convertToFollower(); } else if (state == State.LEADER && response.getTerm() == currentTerm) { if (response.isSuccess()) { - if (matchIndex.containsKey(sender.pubkey) && matchIndex.get(sender.pubkey) > response.getLastLogIndex()) { + if (matchIndex.containsKey(sender.pubkey) + && matchIndex.get(sender.pubkey) > response.getLastLogIndex()) { LOGGER.warn("node={} matchIndex decrease! prev matchIndex={}, lastLogIndex={}", - sender.pubkey, matchIndex.get(sender.pubkey), response.getLastLogIndex()); + sender.pubkey, matchIndex.get(sender.pubkey), + response.getLastLogIndex()); throw new RuntimeException("matchIndex error"); } matchIndex.put(sender.pubkey, response.getLastLogIndex()); @@ -286,7 +303,8 @@ public class RaftAlgorithm implements CommitAlgorithm { if (lastLogIndex >= index) { sendAppendEntries(node, index); } else { - AppendEntries req = new AppendEntries(currentTerm, self, lastEntry.getIndex(), lastEntry.getTerm(), commitIndex, null); + AppendEntries req = new AppendEntries(currentTerm, self, lastEntry.getIndex(), + lastEntry.getTerm(), commitIndex, null); connection.sendMessage(node, new RaftMessageParser(req).getBytes()); } }); @@ -310,7 +328,8 @@ public class RaftAlgorithm implements CommitAlgorithm { private void sendAppendEntries(PubKeyNode follower, long logIndex) { LogEntry prevLog = logManager.getEntry(logIndex - 1); List entries = logManager.getEntriesStartAt(logIndex); - AppendEntries request = new AppendEntries(currentTerm, leader, prevLog.getIndex(), prevLog.getTerm(), commitIndex, entries); + AppendEntries request = new AppendEntries(currentTerm, leader, prevLog.getIndex(), + prevLog.getTerm(), commitIndex, entries); connection.sendMessage(follower, new RaftMessageParser(request).getBytes()); } @@ -342,8 +361,9 @@ public class RaftAlgorithm implements CommitAlgorithm { else nextIndex.put(server.pubkey, lastEntry.getIndex() + 1); matchIndex.put(server.pubkey, 0L); - // AppendEntries req = new AppendEntries(currentTerm, self, lastEntry.getIndex(), lastEntry.getTerm(), commitIndex, null); - // connection.sendMessage(server,new RaftMessageParser(req).getBytes()); + // AppendEntries req = new AppendEntries(currentTerm, self, lastEntry.getIndex(), + // lastEntry.getTerm(), commitIndex, null); + // connection.sendMessage(server,new RaftMessageParser(req).getBytes()); } } } @@ -379,7 +399,8 @@ public class RaftAlgorithm implements CommitAlgorithm { private boolean isServerValid(PubKeyNode server) { - if (server == null) return false; + if (server == null) + return false; List nodes = connection.getNodes(); for (PubKeyNode node : nodes) { if (node.pubkey.equals(server.pubkey)) { diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/message/AppendEntries.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/AppendEntries.java index f7608a9..23b651a 100644 --- a/src/main/java/org/bdware/consistency/plugin/raft/algo/message/AppendEntries.java +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/AppendEntries.java @@ -23,7 +23,8 @@ public class AppendEntries implements IRaftMessage { this.type = RaftMessageType.AppendEntries; } - public AppendEntries(long term, PubKeyNode leader, long prevLogIndex, long prevLogTerm, long leaderCommit, List entries) { + public AppendEntries(long term, PubKeyNode leader, long prevLogIndex, long prevLogTerm, + long leaderCommit, List entries) { this.type = RaftMessageType.AppendEntries; this.term = term; this.leader = leader; diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/message/EmptyMessage.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/EmptyMessage.java index 2ade4c1..7eb2c52 100644 --- a/src/main/java/org/bdware/consistency/plugin/raft/algo/message/EmptyMessage.java +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/EmptyMessage.java @@ -1,10 +1,10 @@ package org.bdware.consistency.plugin.raft.algo.message; -public class EmptyMessage implements IRaftMessage{ +public class EmptyMessage implements IRaftMessage { RaftMessageType type; public EmptyMessage() { - type=RaftMessageType.Unknown; + type = RaftMessageType.Unknown; } @Override diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/message/RaftMessageType.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/RaftMessageType.java index af0d17f..3e11302 100644 --- a/src/main/java/org/bdware/consistency/plugin/raft/algo/message/RaftMessageType.java +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/RaftMessageType.java @@ -1,11 +1,7 @@ package org.bdware.consistency.plugin.raft.algo.message; public enum RaftMessageType { - RequestVote(0), - RequestVoteResponse(1), - AppendEntries(2), - AppendEntriesResponse(3), - Unknown(4); + RequestVote(0), RequestVoteResponse(1), AppendEntries(2), AppendEntriesResponse(3), Unknown(4); private final int type; diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/LogEntry.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/LogEntry.java index b89cd14..40d5889 100644 --- a/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/LogEntry.java +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/LogEntry.java @@ -67,7 +67,8 @@ public class LogEntry { int dataLen = ByteUtil.readInt(bi); if (dataLen > 0) { byte[] dataByte = ByteUtil.readBytes(bi, dataLen); - entry.command = JsonUtil.fromJson(new String(dataByte, StandardCharsets.UTF_8), ContractRequest.class); + entry.command = JsonUtil.fromJson(new String(dataByte, StandardCharsets.UTF_8), + ContractRequest.class); } else { entry.command = null; } diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/LogManager.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/LogManager.java index 9f4bda4..744813d 100644 --- a/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/LogManager.java +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/LogManager.java @@ -11,10 +11,14 @@ import java.util.*; public class LogManager { - private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(LogManager.class); + private static final Logger LOGGER = + org.apache.logging.log4j.LogManager.getLogger(LogManager.class); private final String logDir; - private final TreeMap segmentStartIndexMap = new TreeMap<>(); // key is start index, value is file name of the segment + private final TreeMap segmentStartIndexMap = new TreeMap<>(); // key is start + // index, value is + // file name of the + // segment private int maxSegmentFileSize; private Segment lastSegment; @@ -62,7 +66,8 @@ public class LogManager { if (index >= lastSegment.getStartIndex() && index <= lastSegment.getEndIndex()) { return lastSegment.getEntry(index); } - Segment segment = Segment.loadSegment(logDir, segmentStartIndexMap.floorEntry(index).getValue()); + Segment segment = + Segment.loadSegment(logDir, segmentStartIndexMap.floorEntry(index).getValue()); return segment.getEntry(index); } @@ -70,16 +75,17 @@ public class LogManager { long newLastLogIndex = this.getLastEntryIndex(); newLastLogIndex++; byte[] entryBytes = entry.getBytes(); - LOGGER.info("AppendLogEntry: last=" + this.getLastEntryIndex() + " -> toAdd=" + entry.getIndex()); + LOGGER.info("AppendLogEntry: last=" + this.getLastEntryIndex() + " -> toAdd=" + + entry.getIndex()); try { - if (segmentStartIndexMap.isEmpty() || lastSegment == null || - lastSegment.getFileSize() + entryBytes.length > maxSegmentFileSize) { + if (segmentStartIndexMap.isEmpty() || lastSegment == null + || lastSegment.getFileSize() + entryBytes.length > maxSegmentFileSize) { // close file and rename if (lastSegment != null) { FileUtil.closeFile(lastSegment.getRandomAccessFile()); - String newFileName = String.format("%016x-%016x", - lastSegment.getStartIndex(), lastSegment.getEndIndex()); + String newFileName = String.format("%016x-%016x", lastSegment.getStartIndex(), + lastSegment.getEndIndex()); String newFullFileName = logDir + File.separator + newFileName; File newFile = new File(newFullFileName); String oldFullFileName = logDir + File.separator + lastSegment.getFileName(); @@ -160,7 +166,8 @@ public class LogManager { } private void fillMap(File dir) { - if (dir == null) return; + if (dir == null) + return; File[] files = dir.listFiles(); if (files == null) { LOGGER.warn("get file in dir({}) error", dir.getName()); diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/Segment.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/Segment.java index 9a29b4b..1dca98f 100644 --- a/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/Segment.java +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/Segment.java @@ -1,18 +1,17 @@ package org.bdware.consistency.plugin.raft.algo.storage; -import org.apache.commons.io.FileUtils; import org.apache.logging.log4j.Logger; import org.bdware.consistency.plugin.raft.algo.util.FileUtil; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.List; public class Segment { - private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(Segment.class); + private static final Logger LOGGER = + org.apache.logging.log4j.LogManager.getLogger(Segment.class); private long startIndex; private long endIndex; private long fileSize; diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/StateManager.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/StateManager.java index fd93f63..a6a5298 100644 --- a/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/StateManager.java +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/StateManager.java @@ -7,6 +7,7 @@ public class StateManager { String filename; static long currentTerm; static PubKeyNode node; + public static long loadCurrentTerm() { return currentTerm; } diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/util/FileUtil.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/util/FileUtil.java index 5ceb835..bd551f8 100644 --- a/src/main/java/org/bdware/consistency/plugin/raft/algo/util/FileUtil.java +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/util/FileUtil.java @@ -2,20 +2,18 @@ package org.bdware.consistency.plugin.raft.algo.util; import org.apache.commons.io.FileUtils; import org.apache.logging.log4j.Logger; -import org.bdware.consistency.plugin.raft.algo.message.IRaftMessage; import org.bdware.consistency.plugin.raft.algo.storage.LogEntry; -import org.bdware.consistency.plugin.raft.algo.storage.Segment; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; -import java.lang.reflect.Method; import java.nio.channels.FileChannel; import java.util.zip.CRC32; public class FileUtil { - private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(FileUtil.class); + private static final Logger LOGGER = + org.apache.logging.log4j.LogManager.getLogger(FileUtil.class); public static RandomAccessFile createFile(String dir, String fileName, String mode) { try { diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/util/HeartBeatUtil.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/util/HeartBeatUtil.java index d96e2ff..17b4913 100644 --- a/src/main/java/org/bdware/consistency/plugin/raft/algo/util/HeartBeatUtil.java +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/util/HeartBeatUtil.java @@ -39,9 +39,8 @@ public class HeartBeatUtil { public synchronized void schedule(TimerTask timerTask, int delay, int period) { try { if (!recordedFuture.containsKey(timerTask)) { - ScheduledFuture future = - ContractManager.scheduledThreadPool.scheduleWithFixedDelay( - timerTask, delay, period, TimeUnit.MILLISECONDS); + ScheduledFuture future = ContractManager.scheduledThreadPool + .scheduleWithFixedDelay(timerTask, delay, period, TimeUnit.MILLISECONDS); recordedFuture.put(timerTask, future); } } catch (Exception e) { diff --git a/src/main/java/org/bdware/consistency/plugin/ro/RequestOnceExecutor.java b/src/main/java/org/bdware/consistency/plugin/ro/RequestOnceExecutor.java index fc303d3..21ae81d 100644 --- a/src/main/java/org/bdware/consistency/plugin/ro/RequestOnceExecutor.java +++ b/src/main/java/org/bdware/consistency/plugin/ro/RequestOnceExecutor.java @@ -24,30 +24,31 @@ public class RequestOnceExecutor extends AbstractContextContractExecutor { } @Override - public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { - ResultCallback cb = - new ResultCallback() { - @Override - public void onResult(String str) { - LOGGER.debug(str); - JsonObject jo = JsonParser.parseString(str).getAsJsonObject(); - JsonObject result = - JsonParser.parseString(jo.get("data").getAsString()) - .getAsJsonObject(); - for (String key : result.keySet()) jo.add(key, result.get(key)); - jo.remove("action"); - jo.addProperty("action", "onExecuteResult"); - LOGGER.debug(jo.toString()); - rc.onResult(jo.toString()); - } - }; + public void execute(String requestID, ContractRequest req, ResultCallback rc, + OnHashCallback hcb) { + ResultCallback cb = new ResultCallback() { + @Override + public void onResult(String str) { + LOGGER.debug(str); + JsonObject jo = JsonParser.parseString(str).getAsJsonObject(); + JsonObject result = + JsonParser.parseString(jo.get("data").getAsString()).getAsJsonObject(); + for (String key : result.keySet()) + jo.add(key, result.get(key)); + jo.remove("action"); + jo.addProperty("action", "onExecuteResult"); + LOGGER.debug(jo.toString()); + rc.onResult(jo.toString()); + } + }; masterServerTCPAction.getSync().sleep(requestID, cb); - String[] members = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers(); + String[] members = cmActions.getManager().multiContractRecorder + .getMultiContractMeta(contractID).getMembers(); for (int i = 0; i < members.length; i++) { LOGGER.info("[members]:" + members.length); int size = members.length; String nodeID = members[order.incrementAndGet() % size]; - //ADD Connect + // ADD Connect Map obj = new HashMap<>(); obj.put("action", "executeContractLocally"); obj.put("requestID", requestID); @@ -56,8 +57,7 @@ public class RequestOnceExecutor extends AbstractContextContractExecutor { networkManager.sendToAgent(nodeID, JsonUtil.toJson(obj)); return; } - rc.onResult( - "{\"status\":\"Error\",\"result\":\"all nodes " - + " offline\",\"action\":\"onExecuteContract\"}"); + rc.onResult("{\"status\":\"Error\",\"result\":\"all nodes " + + " offline\",\"action\":\"onExecuteContract\"}"); } } diff --git a/src/main/java/org/bdware/consistency/plugin/ro/ResponseOnceExecutor.java b/src/main/java/org/bdware/consistency/plugin/ro/ResponseOnceExecutor.java index 824a74c..49a59ee 100644 --- a/src/main/java/org/bdware/consistency/plugin/ro/ResponseOnceExecutor.java +++ b/src/main/java/org/bdware/consistency/plugin/ro/ResponseOnceExecutor.java @@ -25,38 +25,39 @@ public class ResponseOnceExecutor extends AbstractContextContractExecutor { } @Override - public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { + public void execute(String requestID, ContractRequest req, ResultCallback rc, + OnHashCallback hcb) { executeInternal(requestID, rc, req, 2); } - private void executeInternal( - String requestID, ResultCallback rc, ContractRequest req, int count) { - // String contractID = req.getContractID(); + private void executeInternal(String requestID, ResultCallback rc, ContractRequest req, + int count) { + // String contractID = req.getContractID(); // TODO 标注失效节点,是否选择重新迁移?? - ResultCallback cb = - new ResultCallback() { - @Override - public void onResult(String str) { - LOGGER.debug(str); - JsonObject jo = JsonParser.parseString(str).getAsJsonObject(); - jo.remove("action"); - jo.addProperty("action", "onExecuteResult"); - LOGGER.debug(jo.toString()); - if (jo.has("data")) { - String data = jo.get("data").getAsString(); - ContractResult cr = JsonUtil.fromJson(data, ContractResult.class); - if (cr.status != ContractResult.Status.Success && count > 0) { - executeInternal(requestID, rc, req, count - 1); - } else rc.onResult(jo.toString()); - } else { - JsonObject jo2 = new JsonObject(); - jo2.addProperty("action", "onExecuteResult"); - jo.remove("action"); - jo2.addProperty("data", jo.toString()); - rc.onResult(jo2.toString()); - } - } - }; + ResultCallback cb = new ResultCallback() { + @Override + public void onResult(String str) { + LOGGER.debug(str); + JsonObject jo = JsonParser.parseString(str).getAsJsonObject(); + jo.remove("action"); + jo.addProperty("action", "onExecuteResult"); + LOGGER.debug(jo.toString()); + if (jo.has("data")) { + String data = jo.get("data").getAsString(); + ContractResult cr = JsonUtil.fromJson(data, ContractResult.class); + if (cr.status != ContractResult.Status.Success && count > 0) { + executeInternal(requestID, rc, req, count - 1); + } else + rc.onResult(jo.toString()); + } else { + JsonObject jo2 = new JsonObject(); + jo2.addProperty("action", "onExecuteResult"); + jo.remove("action"); + jo2.addProperty("data", jo.toString()); + rc.onResult(jo2.toString()); + } + } + }; masterServerTCPAction.getSync().sleepWithTimeout(requestID, cb, 5); if (!sendOnce(requestID, req)) rc.onResult( @@ -64,7 +65,8 @@ public class ResponseOnceExecutor extends AbstractContextContractExecutor { } private boolean sendOnce(String requestID, ContractRequest req) { - String[] members = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers(); + String[] members = cmActions.getManager().multiContractRecorder + .getMultiContractMeta(contractID).getMembers(); for (int i = 0; i < members.length; i++) { int size = members.length; String nodeID = members[order.incrementAndGet() % size]; diff --git a/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java b/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java index b519f67..8855e3f 100644 --- a/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java +++ b/src/main/java/org/bdware/consistency/plugin/sharding/MultiPointCooperationExecutor.java @@ -53,9 +53,14 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut request_index = new AtomicInteger(seq); } - public ResultCallback createResultCallback(final String requestID, final ResultCallback originalCb, final int count, final int request_seq, final String contractID, JoinInfo joinInfo) { + public ResultCallback createResultCallback(final String requestID, + final ResultCallback originalCb, final int count, final int request_seq, + final String contractID, JoinInfo joinInfo) { // TODO 加对应的超时? - return networkManager.createResultCallback(requestID, new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo), count); // 把count改成了1,设置成获得1个响应就行 + return networkManager.createResultCallback(requestID, + new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, + contractID, joinInfo), + count); // 把count改成了1,设置成获得1个响应就行 } public void sendRequest(String id, ContractRequest req, String[] nodes, ResultCallback rc) { @@ -78,56 +83,65 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut if (node.equals(globalConf.getNodeID())) { masterClientTCPAction.asyncExecuteContractLocally(jo, rc); } else { - // LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5) + " not null " + "RequestAllExecutor 发送请求给 " + node.substring(0, 5)); + // LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5) + " not null " + + // "RequestAllExecutor 发送请求给 " + node.substring(0, 5)); networkManager.sendToAgent(node, sendStr); } } } - public static T invokeFunctionWithoutLimit(String contractID, String funcName, Class t, JsonElement... args) { + public static T invokeFunctionWithoutLimit(String contractID, String funcName, Class t, + JsonElement... args) { ContractClient client = cmActions.getManager().getClient(contractID); JsonObject arg = new JsonObject(); arg.addProperty("funcName", funcName); JsonArray funcArgs = new JsonArray(); - if (args != null && args.length > 0) for (JsonElement je : args) - funcArgs.add(je); + if (args != null && args.length > 0) + for (JsonElement je : args) + funcArgs.add(je); arg.add("funcArgs", funcArgs); - String routeResultStr = client.executeMethod("", "invokeFunctionWithoutLimit", arg.toString()); + String routeResultStr = + client.executeMethod("", "invokeFunctionWithoutLimit", arg.toString()); T routeResult = JsonUtil.fromJson(routeResultStr, t); return routeResult; } - private String[] getAccordingToRouteInfo(RouteInfo routeInfo, ContractRequest req, String[] members) { + private String[] getAccordingToRouteInfo(RouteInfo routeInfo, ContractRequest req, + String[] members) { try { int val; if (routeInfo.useDefault == null) { // func myFunc (sourceArg, requester) - JsonElement requester = req.getRequester() == null ? JsonNull.INSTANCE : new JsonPrimitive(req.getRequester()); - return invokeFunctionWithoutLimit(req.getContractID(), routeInfo.funcName, String[].class, req.getArg(), requester); + JsonElement requester = req.getRequester() == null ? JsonNull.INSTANCE + : new JsonPrimitive(req.getRequester()); + return invokeFunctionWithoutLimit(req.getContractID(), routeInfo.funcName, + String[].class, req.getArg(), requester); } switch (routeInfo.useDefault) { case byRequester: - val = new BigInteger(req.getRequester(), 16).mod(new BigInteger("" + members.length)).intValue(); + val = new BigInteger(req.getRequester(), 16) + .mod(new BigInteger("" + members.length)).intValue(); while (val < 0 && members.length > 0) { val = val + members.length; } - return new String[]{members[val]}; + return new String[] {members[val]}; case byArgHash: val = req.getArg().hashCode(); val = val % members.length; while (val < 0 && members.length > 0) { val += members.length; } - return new String[]{members[val]}; + return new String[] {members[val]}; case byJsonPropHash: JsonElement jo = tryLoadJsonProp(req, routeInfo.param); val = jo.toString().hashCode() % members.length; while (val < 0 && members.length > 0) { val += members.length; } - return new String[]{members[val]}; + return new String[] {members[val]}; case byShardingIDTree: - return byShardingIDTree(members, meta.contract.shardingId, Integer.valueOf(routeInfo.param)); + return byShardingIDTree(members, meta.contract.shardingId, + Integer.valueOf(routeInfo.param)); default: return members; } @@ -146,11 +160,12 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut System.arraycopy(members, from, ret, 0, to - from); return ret; } else - return new String[]{}; + return new String[] {}; } else { childCount = Math.abs(childCount); - if (shardingId > 0) shardingId = (shardingId - 1) / childCount; - return new String[]{members[shardingId]}; + if (shardingId > 0) + shardingId = (shardingId - 1) / childCount; + return new String[] {members[shardingId]}; } } @@ -161,7 +176,8 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut JsonObject arg; if (req.getArg().isJsonPrimitive()) { arg = JsonUtil.parseString(req.getArg().getAsString()).getAsJsonObject(); - } else arg = req.getArg().getAsJsonObject(); + } else + arg = req.getArg().getAsJsonObject(); if (!param.contains(".")) { return arg.get(param); } else { @@ -183,21 +199,23 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut int validNode = 0; for (String node : nodes) { if (networkManager.hasAgentConnection(node)) { - //&& masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID) - // == RecoverFlag.Fine + // && masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID) + // == RecoverFlag.Fine validNode++; } } int c = resultCount; - if (type == ContractExecType.Sharding) c = (int) Math.ceil((double) c / 2); + if (type == ContractExecType.Sharding) + c = (int) Math.ceil((double) c / 2); return validNode >= c; } @Override - public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { - //LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req)); + public void execute(String requestID, ContractRequest req, ResultCallback rc, + OnHashCallback hcb) { + // LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req)); // 获得action 函数名 - // LOGGER.info("action is : " + req.getAction()); + // LOGGER.info("action is : " + req.getAction()); req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID())); if (requestID != null && requestID.endsWith("_mul")) { synchronized (lock) { @@ -212,15 +230,17 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut req.seq = request_index.getAndIncrement(); } req.needSeq = true; - String id = System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq; - //LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id); + String id = + System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq; + // LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id); // 校验成功 current node num 合法 - //LOGGER.info("checkCurNodeNumValid true"); - ContractMeta meta = cmActions.getManager().statusRecorder.getContractMeta(req.getContractID()); + // LOGGER.info("checkCurNodeNumValid true"); + ContractMeta meta = + cmActions.getManager().statusRecorder.getContractMeta(req.getContractID()); FunctionDesp fun = meta.getExportedFunction(req.getAction()); ResultCallback collector; - //Count 根据join规则来。 - //nodes 根据route规则来。 + // Count 根据join规则来。 + // nodes 根据route规则来。 JoinInfo joinInfo = fun.joinInfo; RouteInfo routeInfo = fun.routeInfo; int count = getJoinCount(joinInfo, contractID, req); @@ -230,28 +250,35 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut if (nodes.length < count) { count = nodes.length; } - //LOGGER.info("requestID=" + requestID + " join Count: " + count + " nodeCount:" + nodes.length); + // LOGGER.info("requestID=" + requestID + " join Count: " + count + " nodeCount:" + + // nodes.length); if (count > 0) { collector = createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); // 初始化结果收集器 masterServerTCPAction.getSync().sleep(id, collector); - // LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); + // LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); sendRequest(id, req, nodes, collector); // 发送请求 } else { - ContractResult cr = new ContractResult(ContractResult.Status.Exception, new JsonPrimitive("broadcast 0")); + ContractResult cr = new ContractResult(ContractResult.Status.Exception, + new JsonPrimitive("broadcast 0")); rc.onResult(JsonUtil.toJson(cr)); } } - private int getJoinCount(JoinInfo joinInfo, String contractID, ContractRequest contractRequest) { + private int getJoinCount(JoinInfo joinInfo, String contractID, + ContractRequest contractRequest) { try { - if (joinInfo == null) return resultCount; + if (joinInfo == null) + return resultCount; if (joinInfo.joinCountFuncName != null) { - Integer count = invokeFunctionWithoutLimit(contractID, joinInfo.joinCountFuncName, Integer.class, new JsonPrimitive(contractRequest.getRequester()), contractRequest.getArg()); + Integer count = invokeFunctionWithoutLimit(contractID, joinInfo.joinCountFuncName, + Integer.class, new JsonPrimitive(contractRequest.getRequester()), + contractRequest.getArg()); return count; } - if (joinInfo.joinCount != 0) return joinInfo.joinCount; + if (joinInfo.joinCount != 0) + return joinInfo.joinCount; } catch (Exception e) { e.printStackTrace(); } @@ -281,7 +308,8 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut Set nodeIDs = new HashSet<>(); // 已收到返回结果的节点 JoinInfo joinInfo; - ResultMerger(final ResultCallback originalCb, final int count, final int request_seq, final String contractID, final JoinInfo joinInfo) { + ResultMerger(final ResultCallback originalCb, final int count, final int request_seq, + final String contractID, final JoinInfo joinInfo) { originalCallback = originalCb; this.count = count; this.request_seq = request_seq; @@ -292,7 +320,8 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut } public String getInfo() { - return "contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + " order=" + order + " count=" + count + " "; + return "contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + " order=" + order + + " count=" + count + " "; } @Override @@ -301,13 +330,13 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut // str的data是个ContractResult // 在这儿也是返回个ContractResult try { - //LOGGER.info(str); + // LOGGER.info(str); JsonObject obj = JsonParser.parseString(str).getAsJsonObject(); String id; if (obj.get("nodeID") == null) { id = "null"; - // LOGGER.info("已经收到第" + order + "节点的结果,无节点ID! 该结果被忽略" + order); + // LOGGER.info("已经收到第" + order + "节点的结果,无节点ID! 该结果被忽略" + order); return; } else id = obj.get("nodeID").getAsString(); @@ -316,7 +345,8 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut return; } nodeIDs.add(id); - //LOGGER.info("contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + str + " order=" + order + " count=" + count); + // LOGGER.info("contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + str + " + // order=" + order + " count=" + count); componedContractResult.add(obj); // 收集到所有结果 if (order.incrementAndGet() == count) { @@ -324,26 +354,28 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut finalResult.needSeq = true; finalResult.seq = request_seq; - // if (null == finalResult) { - // finalResult = - // new ContractResult( - // ContractResult.Status.Exception, - // new JsonPrimitive( - // "no nore than half of the + // if (null == finalResult) { + // finalResult = + // new ContractResult( + // ContractResult.Status.Exception, + // new JsonPrimitive( + // "no nore than half of the // consistent result")); - // originalCallback.onResult(new + // originalCallback.onResult(new // Gson().toJson(finalResult)); - // } else { + // } else { if (joinInfo != null) { handleJoinInfo(finalResult, joinInfo); } originalCallback.onResult(JsonUtil.toJson(finalResult)); - // } - //LOGGER.info("本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result); + // } + // LOGGER.info("本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + + // finalResult.result); // 集群中事务序号+1 // MasterServerTCPAction.contractID2Members.get(contractID).nextSeq(); - cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).nextSeqAtMaster(); + cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID) + .nextSeqAtMaster(); // recover,其中无状态合约CP出错无需恢复 Set nodesID = componedContractResult.getProblemNodes(); if (null == nodesID || nodesID.isEmpty()) { @@ -351,21 +383,25 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut } for (String nodeID : nodesID) { LOGGER.info("结果出现问题的节点有:" + nodeID); - if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) == RecoverFlag.Fine) { - masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).put(contractID, RecoverFlag.ToRecover); + if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID) + .get(contractID) == RecoverFlag.Fine) { + masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID) + .put(contractID, RecoverFlag.ToRecover); } } for (String nodeID : nodesID) { - if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) == RecoverFlag.ToRecover) { + if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID) + .get(contractID) == RecoverFlag.ToRecover) { LOGGER.info("问题节点开始恢复:" + nodeID); // 因为该节点结果有误,所以即时是stableMode也认为trans记录不可信 // 直接通过load别的节点来恢复 - masterServerRecoverMechAction.restartContractFromCommonMode(nodeID, contractID); + masterServerRecoverMechAction.restartContractFromCommonMode(nodeID, + contractID); } } } - // clearCache(); + // clearCache(); } catch (Exception e) { e.printStackTrace(); LOGGER.info("本次执行最终结果为有异常"); @@ -378,7 +414,9 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut if (joinInfo != null) { if (joinInfo.useDefault == null) { if (joinInfo.joinFuncName != null) { - JsonElement ret = MultiPointCooperationExecutor.invokeFunctionWithoutLimit(contractID, joinInfo.joinFuncName, JsonElement.class, jo); + JsonElement ret = MultiPointCooperationExecutor + .invokeFunctionWithoutLimit(contractID, joinInfo.joinFuncName, + JsonElement.class, jo); finalResult.result = ret; } return; diff --git a/src/main/java/org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutor.java b/src/main/java/org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutor.java index a6d7ee2..4661006 100644 --- a/src/main/java/org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutor.java +++ b/src/main/java/org/bdware/consistency/plugin/sharding/SelfAdaptiveShardingExecutor.java @@ -43,22 +43,18 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto public SelfAdaptiveShardingExecutor(String contractID) { this.meta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID); - this.future = Executors.newScheduledThreadPool(1).scheduleWithFixedDelay( - this::submitBlock, - DELAY, - DELAY, - TimeUnit.SECONDS); + this.future = Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(this::submitBlock, + DELAY, DELAY, TimeUnit.SECONDS); Executors.newCachedThreadPool().execute(() -> { - LOGGER.info(String.format( - "[Executor %s] starting executing service... %b", + LOGGER.info(String.format("[Executor %s] starting executing service... %b", meta.getContractID(), meta.isMaster())); while (running) { LOGGER.info(String.format( "[Executor %s] checking blocks to be executed, latest height=%s, to be executed size=%d", meta.getContractID(), executorPointer, toExecuted.size())); - for (Block block = toExecuted.get(executorPointer); - null != block; - executorPointer += 1, block = toExecuted.get(executorPointer)) { + for (Block block = + toExecuted.get(executorPointer); null != block; executorPointer += + 1, block = toExecuted.get(executorPointer)) { executeBlock(block); toExecuted.remove(executorPointer); } @@ -67,8 +63,7 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto executorFlag.wait(); } } catch (InterruptedException e) { - LOGGER.warn(String.format( - "[Executor %s] waiting is interrupted: %s", + LOGGER.warn(String.format("[Executor %s] waiting is interrupted: %s", meta.getContractID(), e.getMessage())); } } @@ -85,7 +80,8 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto } @Override - public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) { + public void execute(String requestID, ContractRequest req, ResultCallback rcb, + OnHashCallback hcb) { // check client ContractClient client = cmActions.getManager().getClient(meta.getContractID()); if (null == client) { @@ -97,10 +93,10 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto // check function FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction()); if (null == funDesp) { - LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!"); + LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + + " not found!"); rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error, - new JsonPrimitive(String.format( - "action %s of contract %s not found!", + new JsonPrimitive(String.format("action %s of contract %s not found!", req.getAction(), meta.getContractID()))))); return; } @@ -145,19 +141,12 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto Block block = JsonUtil.fromJson(blockStr, Block.class); // the block must have not been cached or executed, and must be valid boolean valid = block.isValid(); - if (!toExecuted.containsKey(block.height) && - block.height >= executorPointer && - valid) { + if (!toExecuted.containsKey(block.height) && block.height >= executorPointer && valid) { // add block into block cache LOGGER.info(String.format( "[Executor %s] receive block [%d] %s -> %s, %d transactions, timestamp=%d, size=%d", - meta.getContractID(), - block.height, - block.hash, - block.prevHash, - block.requests.length, - block.timestamp, - blockStr.length())); + meta.getContractID(), block.height, block.hash, block.prevHash, + block.requests.length, block.timestamp, blockStr.length())); toExecuted.put(block.height, block); // notify thread to execute blocks synchronized (executorFlag) { @@ -165,10 +154,7 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto } } else { LOGGER.warn(String.format("receive block [%d] %s failed! checking: cached %b valid %b", - block.height, - block.hash, - toExecuted.containsKey(block.height), - valid)); + block.height, block.hash, toExecuted.containsKey(block.height), valid)); } } @@ -177,25 +163,25 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto LOGGER.debug("start"); // check contract requests, requests must have not been executed for (ContractRequest request : block.requests) { - if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) { + if (executedTxs.containsKey(request.getRequestID()) + && executedTxs.get(request.getRequestID())) { LOGGER.debug("find request " + request.getRequestID() + " has been executed!"); return; } } // TODO check status if (null != block.checkPoint && !block.checkPoint.isEmpty()) { - cmActions.getManager().recoverUnitFromCheckPoint(meta.getContractID(), block.checkPoint); + cmActions.getManager().recoverUnitFromCheckPoint(meta.getContractID(), + block.checkPoint); } // executed requests for (ContractRequest request : block.requests) { String ret = cmActions.getManager().executeLocally(request, null); - LOGGER.debug(String.format( - "[Executor %s] result of request %s: %s", + LOGGER.debug(String.format("[Executor %s] result of request %s: %s", meta.getContractID(), request.getRequestID(), ret)); executedTxs.put(request.getRequestID(), true); } - LOGGER.info(String.format( - "[Executor %s] execute %d transactions of block [%d] %s", + LOGGER.info(String.format("[Executor %s] execute %d transactions of block [%d] %s", meta.getContractID(), block.requests.length, block.height, block.hash)); } @@ -212,16 +198,18 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto req.addProperty("contractID", this.meta.getContractID()); String reqStr = req.toString(); // deliver blocks - LOGGER.info("[Executor " + meta.getContractID() + "] deliver block " + block.hash + "..."); + LOGGER.info( + "[Executor " + meta.getContractID() + "] deliver block " + block.hash + "..."); String myself = cmActions.getManager().nodeCenterConn.getNodeId(); this.onDeliverBlock(blockStr); for (String node : nodes) { // TODO: find dead lock here -// if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) -// == RecoverFlag.Fine) { -// LOGGER.info("deliver block " + block.hash + " to node " + node); -// NetworkManager.instance.sendToAgent(node, reqStr); -// } + // if + // (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) + // == RecoverFlag.Fine) { + // LOGGER.info("deliver block " + block.hash + " to node " + node); + // NetworkManager.instance.sendToAgent(node, reqStr); + // } if (!Objects.equals(myself, node)) { networkManager.sendToAgent(node, reqStr); } @@ -278,18 +266,19 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto public boolean isValid() { boolean hashValid = computeHash().equals(hash), -// bodyValid = body.equals(merkle(this.requests)), - bodyValid = true, - signValid = verifySignature(); + // bodyValid = body.equals(merkle(this.requests)), + bodyValid = true, signValid = verifySignature(); boolean ret = hashValid & bodyValid & signValid; if (!ret) { - LOGGER.warn(String.format("hash %b body %b sign %b", hashValid, bodyValid, signValid)); + LOGGER.warn( + String.format("hash %b body %b sign %b", hashValid, bodyValid, signValid)); } return ret; } private String computeHash() { - return HashUtil.sha3(String.valueOf(this.height), this.prevHash, this.checkPoint, this.body); + return HashUtil.sha3(String.valueOf(this.height), this.prevHash, this.checkPoint, + this.body); } private String merkle(ContractRequest[] requests) { @@ -300,9 +289,8 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto if (requests.length == 1) { return HashUtil.sha3(requests[0].getRequestID()); } - Queue merkleQueue = - Arrays.stream(requests).map(ContractRequest::getRequestID) - .collect(Collectors.toCollection(ArrayDeque::new)); + Queue merkleQueue = Arrays.stream(requests).map(ContractRequest::getRequestID) + .collect(Collectors.toCollection(ArrayDeque::new)); do { int size; for (size = merkleQueue.size(); size > 1; size -= 2) { @@ -330,4 +318,4 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto return this.hash; } } -} \ No newline at end of file +} diff --git a/src/test/java/org/bdware/consistency/plugin/raft/algo/message/MessageTest.java b/src/test/java/org/bdware/consistency/plugin/raft/algo/message/MessageTest.java index 0dae1ba..04ddee7 100644 --- a/src/test/java/org/bdware/consistency/plugin/raft/algo/message/MessageTest.java +++ b/src/test/java/org/bdware/consistency/plugin/raft/algo/message/MessageTest.java @@ -72,8 +72,10 @@ public class MessageTest { assert msg.getPrevLogTerm() == message.getPrevLogTerm(); assert msg.getLeaderCommit() == message.getLeaderCommit(); assert msg.getEntries().size() == message.getEntries().size(); - assert msg.getEntries().get(0).getCommand().getAction().equals(message.getEntries().get(0).getCommand().getAction()); - assert msg.getEntries().get(9).getCommand().getAction().equals(message.getEntries().get(9).getCommand().getAction()); + assert msg.getEntries().get(0).getCommand().getAction() + .equals(message.getEntries().get(0).getCommand().getAction()); + assert msg.getEntries().get(9).getCommand().getAction() + .equals(message.getEntries().get(9).getCommand().getAction()); } @Test diff --git a/src/test/java/org/bdware/consistency/plugin/raft/algo/storage/LogManagerTest.java b/src/test/java/org/bdware/consistency/plugin/raft/algo/storage/LogManagerTest.java index 9930be4..864e081 100644 --- a/src/test/java/org/bdware/consistency/plugin/raft/algo/storage/LogManagerTest.java +++ b/src/test/java/org/bdware/consistency/plugin/raft/algo/storage/LogManagerTest.java @@ -10,7 +10,8 @@ import java.util.List; public class LogManagerTest { @Test public void testLogManager() { - LogManager logManager = new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000); + LogManager logManager = + new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000); List entries = new ArrayList<>(); LogEntry entry; for (int i = 0; i < 1000; i++) { @@ -28,7 +29,8 @@ public class LogManagerTest { @Test public void testLogManager2() { - LogManager logManager = new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000); + LogManager logManager = + new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000); LogEntry entry = logManager.getEntry(386); assert entry.getTerm() == 38; assert entry.getIndex() == 386; @@ -54,9 +56,11 @@ public class LogManagerTest { long index = logManager.append(entries); assert index == 1099; } + @Test public void testLogManager3() { - LogManager logManager = new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000); + LogManager logManager = + new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000); List entryList = logManager.getEntriesStartAt(677); assert entryList.size() == 1100 - 677; assert entryList.get(100).getIndex() == 777;