build: config spotless plugin and reformat code

This commit is contained in:
Frank.R.Wu 2023-06-15 11:07:58 +08:00
parent 50b8b7fd61
commit 947d117ccc
25 changed files with 449 additions and 445 deletions

View File

@ -2,6 +2,8 @@ plugins {
id 'java' id 'java'
} }
apply from: '../spotless.gradle'
dependencies { dependencies {
implementation project(":consistency-sdk") implementation project(":consistency-sdk")

View File

@ -7,8 +7,12 @@ import org.bdware.server.trustedmodel.ContractExecutor;
public abstract class AbstractContextContractExecutor implements ContractExecutor { public abstract class AbstractContextContractExecutor implements ContractExecutor {
static protected IGlobalConf globalConf = ConsistencyPluginManager.getContext().getGlobalConf(); static protected IGlobalConf globalConf = ConsistencyPluginManager.getContext().getGlobalConf();
static protected ICMActions cmActions = ConsistencyPluginManager.getContext().getCMActions(); static protected ICMActions cmActions = ConsistencyPluginManager.getContext().getCMActions();
static protected INetworkManager networkManager = ConsistencyPluginManager.getContext().getNetworkManager(); static protected INetworkManager networkManager =
static protected IMasterClientTCPAction masterClientTCPAction = ConsistencyPluginManager.getContext().getMasterClientTCPAction(); ConsistencyPluginManager.getContext().getNetworkManager();
static protected IMasterServerTCPAction masterServerTCPAction = ConsistencyPluginManager.getContext().getMasterServerTCPAction(); static protected IMasterClientTCPAction masterClientTCPAction =
static protected IMasterServerRecoverMechAction masterServerRecoverMechAction = ConsistencyPluginManager.getContext().getMasterServerRecoverMechAction(); ConsistencyPluginManager.getContext().getMasterClientTCPAction();
static protected IMasterServerTCPAction masterServerTCPAction =
ConsistencyPluginManager.getContext().getMasterServerTCPAction();
static protected IMasterServerRecoverMechAction masterServerRecoverMechAction =
ConsistencyPluginManager.getContext().getMasterServerRecoverMechAction();
} }

View File

@ -25,7 +25,8 @@ public class ContractCluster implements TrustfulExecutorConnection<PubKeyNode> {
jo.addProperty("action", "contractSyncMessage"); jo.addProperty("action", "contractSyncMessage");
jo.addProperty("contractID", contractID); jo.addProperty("contractID", contractID);
jo.addProperty("data", ByteUtil.encodeBASE64(msg)); jo.addProperty("data", ByteUtil.encodeBASE64(msg));
ConsistencyPluginManager.getInstance().getContext().getNetworkManager().sendToAgent(node.pubkey, jo.toString()); ConsistencyPluginManager.getInstance().getContext().getNetworkManager()
.sendToAgent(node.pubkey, jo.toString());
} }
@Override @Override

View File

@ -26,7 +26,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
//TODO 追赶差下的调用 // TODO 追赶差下的调用
public class PBFTExecutor extends AbstractContextContractExecutor { public class PBFTExecutor extends AbstractContextContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(PBFTExecutor.class); private static final Logger LOGGER = LogManager.getLogger(PBFTExecutor.class);
final Object lock = new Object(); final Object lock = new Object();
@ -37,14 +37,13 @@ public class PBFTExecutor extends AbstractContextContractExecutor {
// key为requestIDvalue为其seq // key为requestIDvalue为其seq
Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>(); Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>();
Map<String, ResultCache> resultCache = new ConcurrentHashMap<>(); Map<String, ResultCache> resultCache = new ConcurrentHashMap<>();
// MultiPointContractInfo info; // MultiPointContractInfo info;
String contractID; String contractID;
PBFTAlgorithm pbft; PBFTAlgorithm pbft;
ContractCluster contractCluster; ContractCluster contractCluster;
boolean isMaster; boolean isMaster;
public PBFTExecutor( public PBFTExecutor(int c, String con_id, final String masterPubkey, String[] members) {
int c, String con_id, final String masterPubkey, String[] members) {
resultCount = c; resultCount = c;
contractID = con_id; contractID = con_id;
this.members = new ArrayList<>(); this.members = new ArrayList<>();
@ -65,7 +64,8 @@ public class PBFTExecutor extends AbstractContextContractExecutor {
} }
contractCluster = new ContractCluster(contractID, this.members); contractCluster = new ContractCluster(contractID, this.members);
pbft.setConnection(contractCluster); pbft.setConnection(contractCluster);
final MultiContractMeta cei = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID); final MultiContractMeta cei =
cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID);
pbft.setCommitter(new Committer() { pbft.setCommitter(new Committer() {
@Override @Override
public void onCommit(ContractRequest data) { public void onCommit(ContractRequest data) {
@ -100,23 +100,20 @@ public class PBFTExecutor extends AbstractContextContractExecutor {
pbft.setAtomSeq(request_index.get()); pbft.setAtomSeq(request_index.get());
} }
public ResultCallback createResultCallback( public ResultCallback createResultCallback(final String requestID,
final String requestID, final ResultCallback originalCb, final int count, final int request_seq,
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) { final String contractID) {
ComponedContractResult componedContractResult = new ComponedContractResult(count); ComponedContractResult componedContractResult = new ComponedContractResult(count);
// TODO 加对应的超时 // TODO 加对应的超时
return networkManager.createResultCallback( return networkManager.createResultCallback(requestID,
requestID, new ResultMerger(originalCb, count, request_seq, contractID), count); new ResultMerger(originalCb, count, request_seq, contractID), count);
} }
public void sendRequest(String id, ContractRequest req, ResultCallback collector) { public void sendRequest(String id, ContractRequest req, ResultCallback collector) {
// Map<String, Object> reqStr = new HashMap<>(); // Map<String, Object> reqStr = new HashMap<>();
// reqStr.put("uniReqID", id); // reqStr.put("uniReqID", id);
// reqStr.put("data", req); // reqStr.put("data", req);
// reqStr.put("action", "executeContractLocally"); // reqStr.put("action", "executeContractLocally");
ContractRequest cr2 = ContractRequest.parse(req.toByte()); ContractRequest cr2 = ContractRequest.parse(req.toByte());
cr2.setRequestID(id); cr2.setRequestID(id);
PBFTMessage request = new PBFTMessage(); PBFTMessage request = new PBFTMessage();
@ -127,20 +124,17 @@ public class PBFTExecutor extends AbstractContextContractExecutor {
if (!networkManager.hasAgentConnection(node.pubkey)) { if (!networkManager.hasAgentConnection(node.pubkey)) {
LOGGER.warn("cmNode " + node.pubkey.substring(0, 5) + " is null"); LOGGER.warn("cmNode " + node.pubkey.substring(0, 5) + " is null");
collector.onResult( collector.onResult(
"{\"status\":\"Error\",\"result\":\"node offline\"," "{\"status\":\"Error\",\"result\":\"node offline\"," + "\"nodeID\":\""
+ "\"nodeID\":\"" + node + "\"," + "\"action\":\"onExecuteContractTrustfully\"}");
+ node // } else if (MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID)
+ "\"," // != RecoverFlag.Fine) {
+ "\"action\":\"onExecuteContractTrustfully\"}"); // collector.onResult(
// } else if (MasterServerRecoverMechAction.recoverStatus.get(node).get(contractID) // "{\"status\":\"Error\",\"result\":\"node recovering\","
// != RecoverFlag.Fine) { // + "\"nodeID\":\""
// collector.onResult( // + node
// "{\"status\":\"Error\",\"result\":\"node recovering\"," // + "\","
// + "\"nodeID\":\"" // + "\"action\":\"onExecuteContractTrustfully\"}");
// + node // contractCluster.sendMessage(node, request.getBytes());
// + "\","
// + "\"action\":\"onExecuteContractTrustfully\"}");
// contractCluster.sendMessage(node, request.getBytes());
} else { } else {
contractCluster.sendMessage(node, request.getBytes()); contractCluster.sendMessage(node, request.getBytes());
} }
@ -150,8 +144,8 @@ public class PBFTExecutor extends AbstractContextContractExecutor {
masterServerTCPAction.getReqCache().put(contractID, new RequestCache()); masterServerTCPAction.getReqCache().put(contractID, new RequestCache());
} }
// TODO 多调多统一个seq的有多个请求这个需要改 // TODO 多调多统一个seq的有多个请求这个需要改
String[] nodes = String[] nodes = cmActions.getManager().multiContractRecorder
cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers(); .getMultiContractMeta(contractID).getMembers();
LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID()); LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID());
LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes)); LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes));
@ -163,9 +157,11 @@ public class PBFTExecutor extends AbstractContextContractExecutor {
} }
@Override @Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { public void execute(String requestID, ContractRequest req, ResultCallback rc,
OnHashCallback hcb) {
LOGGER.debug(JsonUtil.toJson(req)); LOGGER.debug(JsonUtil.toJson(req));
MultiContractMeta meta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(req.getContractID()); MultiContractMeta meta = cmActions.getManager().multiContractRecorder
.getMultiContractMeta(req.getContractID());
if (meta == null || !meta.isMaster()) { if (meta == null || !meta.isMaster()) {
cmActions.getManager().executeContractOnOtherNodes(req, rc); cmActions.getManager().executeContractOnOtherNodes(req, rc);
return; return;
@ -176,9 +172,9 @@ public class PBFTExecutor extends AbstractContextContractExecutor {
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally" // 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
// 如果是多点合约的请求A1A2A3的序号应该一致不能分配一个新的seq根据requestID判断是否不需要重新分配一个序号 // 如果是多点合约的请求A1A2A3的序号应该一致不能分配一个新的seq根据requestID判断是否不需要重新分配一个序号
//TODO seqMap memory leak // TODO seqMap memory leak
//TODO // TODO
//TODO // TODO
if (null != requestID && requestID.endsWith("_mul")) { if (null != requestID && requestID.endsWith("_mul")) {
synchronized (lock) { synchronized (lock) {
if (seqMap.containsKey(requestID)) { if (seqMap.containsKey(requestID)) {
@ -206,37 +202,32 @@ public class PBFTExecutor extends AbstractContextContractExecutor {
} else { } else {
LOGGER.debug("invalidNodeNumOnResult"); LOGGER.debug("invalidNodeNumOnResult");
request_index.getAndDecrement(); request_index.getAndDecrement();
ContractResult finalResult = ContractResult finalResult = new ContractResult(ContractResult.Status.Error,
new ContractResult( new JsonPrimitive("node number unavailable, request refused."));
ContractResult.Status.Error,
new JsonPrimitive("node number unavailable, request refused."));
rc.onResult(JsonUtil.toJson(finalResult)); rc.onResult(JsonUtil.toJson(finalResult));
} }
// } // }
/* // 三个相同requestID进来的时候会有冲突 /*
// 仅在此处有冲突么 * // 三个相同requestID进来的时候会有冲突 // 仅在此处有冲突么 //
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally" * 这里是从MasterServer->MasterClient请求的是"executeContractLocally" req.seq =
req.seq = request_index.getAndIncrement(); * request_index.getAndIncrement(); req.needSeq = true; ResultCallback collector =
req.needSeq = true; * createResultCallback(id, rc, resultCount, req.getContractID());
ResultCallback collector = createResultCallback(id, rc, resultCount, req.getContractID()); * MasterServerTCPAction.sync.sleep(id, collector); sendRequest(id, req, collector);
MasterServerTCPAction.sync.sleep(id, collector); */
sendRequest(id, req, collector);*/
} }
// 清理缓存的多点合约请求序号 // 清理缓存的多点合约请求序号
public void clearCache() { public void clearCache() {
final long time = System.currentTimeMillis() - 30000L; final long time = System.currentTimeMillis() - 30000L;
seqMap.entrySet() seqMap.entrySet().removeIf(entry -> {
.removeIf( MultiReqSeq cache = entry.getValue();
entry -> { if (null == cache) {
MultiReqSeq cache = entry.getValue(); return true;
if (null == cache) { }
return true; return cache.startTime < time;
} });
return cache.startTime < time;
});
} }
public static class ResultMerger extends ResultCallback { public static class ResultMerger extends ResultCallback {
@ -248,10 +239,7 @@ public class PBFTExecutor extends AbstractContextContractExecutor {
ResultCallback originalCallback; ResultCallback originalCallback;
Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点 Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
ResultMerger( ResultMerger(final ResultCallback originalCb, final int count, final int request_seq,
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) { final String contractID) {
originalCallback = originalCb; originalCallback = originalCb;
this.count = count; this.count = count;
@ -266,16 +254,8 @@ public class PBFTExecutor extends AbstractContextContractExecutor {
} }
public String getInfo() { public String getInfo() {
return "contractID=" return "contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + " order=" + order
+ contractID + " count=" + count + " ";
+ " 收到第 "
+ order
+ " 个节点回复 : "
+ " order="
+ order
+ " count="
+ count
+ " ";
} }
@Override @Override
@ -289,19 +269,15 @@ public class PBFTExecutor extends AbstractContextContractExecutor {
if (obj.has("nodeID")) { if (obj.has("nodeID")) {
String id = obj.get("nodeID").getAsString(); String id = obj.get("nodeID").getAsString();
if (nodeIDs.contains(id)) { if (nodeIDs.contains(id)) {
LOGGER.debug( LOGGER.debug("ignored result because the result of node "
"ignored result because the result of node " + id.substring(0, 5) + " has been received");
+ id.substring(0, 5)
+ " has been received");
return; return;
} }
nodeIDs.add(id); nodeIDs.add(id);
} }
LOGGER.debug( LOGGER.debug(String.format("contractID=%s received=%s order=%d count=%d",
String.format( contractID, str, order.get(), count));
"contractID=%s received=%s order=%d count=%d",
contractID, str, order.get(), count));
componedContractResult.add(obj); componedContractResult.add(obj);
// 收集到所有结果 // 收集到所有结果
if (order.incrementAndGet() == count) { if (order.incrementAndGet() == count) {
@ -309,26 +285,23 @@ public class PBFTExecutor extends AbstractContextContractExecutor {
finalResult.needSeq = true; finalResult.needSeq = true;
finalResult.seq = request_seq; finalResult.seq = request_seq;
// if (null == finalResult) { // if (null == finalResult) {
// finalResult = // finalResult =
// new ContractResult( // new ContractResult(
// ContractResult.Status.Exception, // ContractResult.Status.Exception,
// new JsonPrimitive( // new JsonPrimitive(
// "no nore than half of the // "no nore than half of the
// consistent result")); // consistent result"));
// originalCallback.onResult(new // originalCallback.onResult(new
// Gson().toJson(finalResult)); // Gson().toJson(finalResult));
// } else { // } else {
originalCallback.onResult(JsonUtil.toJson(finalResult)); originalCallback.onResult(JsonUtil.toJson(finalResult));
// } // }
LOGGER.debug( LOGGER.debug(String.format("%d results are the same: %s", finalResult.size,
String.format( finalResult.result));
"%d results are the same: %s",
finalResult.size, finalResult.result));
// 集群中事务序号+1 // 集群中事务序号+1
cmActions.getManager().multiContractRecorder cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID)
.getMultiContractMeta(contractID)
.nextSeqAtMaster(); .nextSeqAtMaster();
// recover其中无状态合约CP出错无需恢复 // recover其中无状态合约CP出错无需恢复
@ -338,26 +311,25 @@ public class PBFTExecutor extends AbstractContextContractExecutor {
} }
for (String nodeID : nodesID) { for (String nodeID : nodesID) {
LOGGER.warn("node fails! " + nodeID); LOGGER.warn("node fails! " + nodeID);
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID)
== RecoverFlag.Fine) { .get(contractID) == RecoverFlag.Fine) {
masterServerRecoverMechAction.getRecoverStatusMap() masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID)
.get(nodeID)
.put(contractID, RecoverFlag.ToRecover); .put(contractID, RecoverFlag.ToRecover);
} }
} }
for (String nodeID : nodesID) { for (String nodeID : nodesID) {
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID)
== RecoverFlag.ToRecover) { .get(contractID) == RecoverFlag.ToRecover) {
LOGGER.warn("node in recover " + nodeID); LOGGER.warn("node in recover " + nodeID);
// 因为该节点结果有误所以即时是stableMode也认为trans记录不可信 // 因为该节点结果有误所以即时是stableMode也认为trans记录不可信
// 直接通过load别的节点来恢复 // 直接通过load别的节点来恢复
masterServerRecoverMechAction.restartContractFromCommonMode( masterServerRecoverMechAction.restartContractFromCommonMode(nodeID,
nodeID, contractID); contractID);
} }
} }
} }
// clearCache(); // clearCache();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
LOGGER.warn("result exception!"); LOGGER.warn("result exception!");

View File

@ -36,11 +36,10 @@ public class RequestAllExecutor extends AbstractContextContractExecutor {
// key为requestIDvalue为其seq // key为requestIDvalue为其seq
Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>(); Map<String, MultiReqSeq> seqMap = new ConcurrentHashMap<>();
Map<String, ResultCache> resultCache = new ConcurrentHashMap<>(); Map<String, ResultCache> resultCache = new ConcurrentHashMap<>();
// MultiPointContractInfo info; // MultiPointContractInfo info;
String contractID; String contractID;
public RequestAllExecutor( public RequestAllExecutor(ContractExecType t, int c, String con_id) {
ContractExecType t, int c, String con_id) {
type = t; type = t;
resultCount = c; resultCount = c;
contractID = con_id; contractID = con_id;
@ -50,16 +49,13 @@ public class RequestAllExecutor extends AbstractContextContractExecutor {
request_index = new AtomicInteger(seq); request_index = new AtomicInteger(seq);
} }
public ResultCallback createResultCallback( public ResultCallback createResultCallback(final String requestID,
final String requestID, final ResultCallback originalCb, final int count, final int request_seq,
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) { final String contractID) {
ComponedContractResult componedContractResult = new ComponedContractResult(count); ComponedContractResult componedContractResult = new ComponedContractResult(count);
// TODO 加对应的超时 // TODO 加对应的超时
return networkManager.createResultCallback( return networkManager.createResultCallback(requestID,
requestID, new ResultMerger(originalCb, count, request_seq, contractID), count); new ResultMerger(originalCb, count, request_seq, contractID), count);
} }
public void sendRequest(String id, ContractRequest req, ResultCallback collector) { public void sendRequest(String id, ContractRequest req, ResultCallback collector) {
@ -78,8 +74,8 @@ public class RequestAllExecutor extends AbstractContextContractExecutor {
LOGGER.debug(JsonUtil.toJson(req)); LOGGER.debug(JsonUtil.toJson(req));
String[] nodes = String[] nodes = cmActions.getManager().multiContractRecorder
cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers(); .getMultiContractMeta(contractID).getMembers();
LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID()); LOGGER.info("cluster size=" + nodes.length + " contract " + req.getContractID());
LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes)); LOGGER.debug("contract " + req.getContractID() + " cluster: " + JsonUtil.toJson(nodes));
for (String node : nodes) { for (String node : nodes) {
@ -87,19 +83,13 @@ public class RequestAllExecutor extends AbstractContextContractExecutor {
if (!networkManager.hasAgentConnection(node)) { if (!networkManager.hasAgentConnection(node)) {
LOGGER.warn("cmNode " + node.substring(0, 5) + " is null"); LOGGER.warn("cmNode " + node.substring(0, 5) + " is null");
collector.onResult( collector.onResult(
"{\"status\":\"Error\",\"result\":\"node offline\"," "{\"status\":\"Error\",\"result\":\"node offline\"," + "\"nodeID\":\""
+ "\"nodeID\":\"" + node + "\"," + "\"action\":\"onExecuteContractTrustfully\"}");
+ node } else if (masterServerRecoverMechAction.getRecoverStatusMap().get(node)
+ "\"," .get(contractID) != RecoverFlag.Fine) {
+ "\"action\":\"onExecuteContractTrustfully\"}");
} else if (masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID)
!= RecoverFlag.Fine) {
collector.onResult( collector.onResult(
"{\"status\":\"Error\",\"result\":\"node recovering\"," "{\"status\":\"Error\",\"result\":\"node recovering\"," + "\"nodeID\":\""
+ "\"nodeID\":\"" + node + "\"," + "\"action\":\"onExecuteContractTrustfully\"}");
+ node
+ "\","
+ "\"action\":\"onExecuteContractTrustfully\"}");
networkManager.sendToAgent(node, sendStr); networkManager.sendToAgent(node, sendStr);
} else { } else {
LOGGER.info("send request to cmNode " + node.substring(0, 5)); LOGGER.info("send request to cmNode " + node.substring(0, 5));
@ -109,16 +99,16 @@ public class RequestAllExecutor extends AbstractContextContractExecutor {
} }
public boolean checkCurNodeNumValid() { public boolean checkCurNodeNumValid() {
String[] nodes = String[] nodes = cmActions.getManager().multiContractRecorder
cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers(); .getMultiContractMeta(contractID).getMembers();
int validNode = 0; int validNode = 0;
Map<String, String> mapResult = new HashMap<>(); Map<String, String> mapResult = new HashMap<>();
for (String node : nodes) { for (String node : nodes) {
mapResult.put(node.substring(0, 5), String.format("%s %s", networkManager.hasAgentConnection(node) + "", mapResult.put(node.substring(0, 5), String.format("%s %s",
networkManager.hasAgentConnection(node) + "",
masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID))); masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID)));
if (networkManager.hasAgentConnection(node) if (networkManager.hasAgentConnection(node) && masterServerRecoverMechAction
&& masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID) .getRecoverStatusMap().get(node).get(contractID) == RecoverFlag.Fine) {
== RecoverFlag.Fine) {
validNode++; validNode++;
} }
} }
@ -132,9 +122,11 @@ public class RequestAllExecutor extends AbstractContextContractExecutor {
} }
@Override @Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { public void execute(String requestID, ContractRequest req, ResultCallback rc,
OnHashCallback hcb) {
LOGGER.debug(JsonUtil.toJson(req)); LOGGER.debug(JsonUtil.toJson(req));
MultiContractMeta meta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(req.getContractID()); MultiContractMeta meta = cmActions.getManager().multiContractRecorder
.getMultiContractMeta(req.getContractID());
if (meta == null || !meta.isMaster()) { if (meta == null || !meta.isMaster()) {
cmActions.getManager().executeContractOnOtherNodes(req, rc); cmActions.getManager().executeContractOnOtherNodes(req, rc);
return; return;
@ -146,9 +138,9 @@ public class RequestAllExecutor extends AbstractContextContractExecutor {
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally" // 这里是从MasterServer->MasterClient请求的是"executeContractLocally"
// 如果是多点合约的请求A1A2A3的序号应该一致不能分配一个新的seq根据requestID判断是否不需要重新分配一个序号 // 如果是多点合约的请求A1A2A3的序号应该一致不能分配一个新的seq根据requestID判断是否不需要重新分配一个序号
//TODO seqMap memory leak // TODO seqMap memory leak
//TODO // TODO
//TODO // TODO
if (null != requestID && requestID.endsWith("_mul")) { if (null != requestID && requestID.endsWith("_mul")) {
synchronized (lock) { synchronized (lock) {
if (seqMap.containsKey(requestID)) { if (seqMap.containsKey(requestID)) {
@ -176,37 +168,32 @@ public class RequestAllExecutor extends AbstractContextContractExecutor {
} else { } else {
LOGGER.debug("invalidNodeNumOnResult"); LOGGER.debug("invalidNodeNumOnResult");
request_index.getAndDecrement(); request_index.getAndDecrement();
ContractResult finalResult = ContractResult finalResult = new ContractResult(ContractResult.Status.Error,
new ContractResult( new JsonPrimitive("node number unavailable, request refused."));
ContractResult.Status.Error,
new JsonPrimitive("node number unavailable, request refused."));
rc.onResult(JsonUtil.toJson(finalResult)); rc.onResult(JsonUtil.toJson(finalResult));
} }
// } // }
/* // 三个相同requestID进来的时候会有冲突 /*
// 仅在此处有冲突么 * // 三个相同requestID进来的时候会有冲突 // 仅在此处有冲突么 //
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally" * 这里是从MasterServer->MasterClient请求的是"executeContractLocally" req.seq =
req.seq = request_index.getAndIncrement(); * request_index.getAndIncrement(); req.needSeq = true; ResultCallback collector =
req.needSeq = true; * createResultCallback(id, rc, resultCount, req.getContractID());
ResultCallback collector = createResultCallback(id, rc, resultCount, req.getContractID()); * MasterServerTCPAction.sync.sleep(id, collector); sendRequest(id, req, collector);
MasterServerTCPAction.sync.sleep(id, collector); */
sendRequest(id, req, collector);*/
} }
// 清理缓存的多点合约请求序号 // 清理缓存的多点合约请求序号
public void clearCache() { public void clearCache() {
final long time = System.currentTimeMillis() - 30000L; final long time = System.currentTimeMillis() - 30000L;
seqMap.entrySet() seqMap.entrySet().removeIf(entry -> {
.removeIf( MultiReqSeq cache = entry.getValue();
entry -> { if (null == cache) {
MultiReqSeq cache = entry.getValue(); return true;
if (null == cache) { }
return true; return cache.startTime < time;
} });
return cache.startTime < time;
});
} }
public static class ResultMerger extends ResultCallback implements NotifiableResultMerger { public static class ResultMerger extends ResultCallback implements NotifiableResultMerger {
@ -218,10 +205,7 @@ public class RequestAllExecutor extends AbstractContextContractExecutor {
ResultCallback originalCallback; ResultCallback originalCallback;
Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点 Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
public ResultMerger( public ResultMerger(final ResultCallback originalCb, final int count, final int request_seq,
final ResultCallback originalCb,
final int count,
final int request_seq,
final String contractID) { final String contractID) {
originalCallback = originalCb; originalCallback = originalCb;
this.count = count; this.count = count;
@ -236,16 +220,8 @@ public class RequestAllExecutor extends AbstractContextContractExecutor {
} }
public String getInfo() { public String getInfo() {
return "contractID=" return "contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + " order=" + order
+ contractID + " count=" + count + " ";
+ " 收到第 "
+ order
+ " 个节点回复 : "
+ " order="
+ order
+ " count="
+ count
+ " ";
} }
@Override @Override
@ -259,19 +235,15 @@ public class RequestAllExecutor extends AbstractContextContractExecutor {
if (obj.has("nodeID")) { if (obj.has("nodeID")) {
String id = obj.get("nodeID").getAsString(); String id = obj.get("nodeID").getAsString();
if (nodeIDs.contains(id)) { if (nodeIDs.contains(id)) {
LOGGER.debug( LOGGER.debug("ignored result because the result of node "
"ignored result because the result of node " + id.substring(0, 5) + " has been received");
+ id.substring(0, 5)
+ " has been received");
return; return;
} }
nodeIDs.add(id); nodeIDs.add(id);
} }
LOGGER.debug( LOGGER.debug(String.format("contractID=%s received=%s order=%d count=%d",
String.format( contractID, str, order.get(), count));
"contractID=%s received=%s order=%d count=%d",
contractID, str, order.get(), count));
componedContractResult.add(obj); componedContractResult.add(obj);
// 收集到所有结果 // 收集到所有结果
if (order.incrementAndGet() == count) { if (order.incrementAndGet() == count) {
@ -279,26 +251,23 @@ public class RequestAllExecutor extends AbstractContextContractExecutor {
finalResult.needSeq = true; finalResult.needSeq = true;
finalResult.seq = request_seq; finalResult.seq = request_seq;
// if (null == finalResult) { // if (null == finalResult) {
// finalResult = // finalResult =
// new ContractResult( // new ContractResult(
// ContractResult.Status.Exception, // ContractResult.Status.Exception,
// new JsonPrimitive( // new JsonPrimitive(
// "no nore than half of the // "no nore than half of the
// consistent result")); // consistent result"));
// originalCallback.onResult(new // originalCallback.onResult(new
// Gson().toJson(finalResult)); // Gson().toJson(finalResult));
// } else { // } else {
originalCallback.onResult(JsonUtil.toJson(finalResult)); originalCallback.onResult(JsonUtil.toJson(finalResult));
// } // }
LOGGER.debug( LOGGER.debug(String.format("%d results are the same: %s", finalResult.size,
String.format( finalResult.result));
"%d results are the same: %s",
finalResult.size, finalResult.result));
// 集群中事务序号+1 // 集群中事务序号+1
cmActions.getManager().multiContractRecorder cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID)
.getMultiContractMeta(contractID)
.nextSeqAtMaster(); .nextSeqAtMaster();
// recover其中无状态合约CP出错无需恢复 // recover其中无状态合约CP出错无需恢复
@ -308,26 +277,25 @@ public class RequestAllExecutor extends AbstractContextContractExecutor {
} }
for (String nodeID : nodesID) { for (String nodeID : nodesID) {
LOGGER.warn("node fails! " + nodeID); LOGGER.warn("node fails! " + nodeID);
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID)
== RecoverFlag.Fine) { .get(contractID) == RecoverFlag.Fine) {
masterServerRecoverMechAction.getRecoverStatusMap() masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID)
.get(nodeID)
.put(contractID, RecoverFlag.ToRecover); .put(contractID, RecoverFlag.ToRecover);
} }
} }
for (String nodeID : nodesID) { for (String nodeID : nodesID) {
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID)
== RecoverFlag.ToRecover) { .get(contractID) == RecoverFlag.ToRecover) {
LOGGER.warn("node in recover " + nodeID); LOGGER.warn("node in recover " + nodeID);
// 因为该节点结果有误所以即时是stableMode也认为trans记录不可信 // 因为该节点结果有误所以即时是stableMode也认为trans记录不可信
// 直接通过load别的节点来恢复 // 直接通过load别的节点来恢复
masterServerRecoverMechAction.restartContractFromCommonMode( masterServerRecoverMechAction.restartContractFromCommonMode(nodeID,
nodeID, contractID); contractID);
} }
} }
} }
// clearCache(); // clearCache();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
LOGGER.warn("result exception!"); LOGGER.warn("result exception!");

View File

@ -15,7 +15,6 @@ public class RequestAllResponseFirstFactory implements ContractExecutorFactory {
@Override @Override
public ContractExecutor getInstance(Map<String, Object> args) { public ContractExecutor getInstance(Map<String, Object> args) {
String contractID = (String) args.get("contractID"); String contractID = (String) args.get("contractID");
return new RequestAllExecutor( return new RequestAllExecutor(ContractExecType.RequestAllResponseFirst, 1, contractID);
ContractExecType.RequestAllResponseFirst, 1, contractID);
} }
} }

View File

@ -16,9 +16,7 @@ public class RequestAllResponseHalfFactory implements ContractExecutorFactory {
public ContractExecutor getInstance(Map<String, Object> args) { public ContractExecutor getInstance(Map<String, Object> args) {
int nodeSize = (int) args.get("nodeSize"); int nodeSize = (int) args.get("nodeSize");
String contractID = (String) args.get("contractID"); String contractID = (String) args.get("contractID");
return new RequestAllExecutor( return new RequestAllExecutor(ContractExecType.RequestAllResponseHalf, nodeSize / 2 + 1,
ContractExecType.RequestAllResponseHalf,
nodeSize / 2 + 1,
contractID); contractID);
} }
} }

View File

@ -39,7 +39,8 @@ public class RAFTExecutor extends AbstractContextContractExecutor {
members.add(pubkeyNode); members.add(pubkeyNode);
} }
contractCluster = new ContractCluster(contractID, members); contractCluster = new ContractCluster(contractID, members);
final MultiContractMeta cei = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID); final MultiContractMeta cei =
cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID);
raft = new RaftAlgorithm(config, selfNode, contractCluster, new Committer() { raft = new RaftAlgorithm(config, selfNode, contractCluster, new Committer() {
@Override @Override
public void onCommit(ContractRequest request) { public void onCommit(ContractRequest request) {
@ -63,7 +64,7 @@ public class RAFTExecutor extends AbstractContextContractExecutor {
} }
}); });
//TODO 在改变角色时需要通知node cluster // TODO 在改变角色时需要通知node cluster
if (cei.isMaster()) { if (cei.isMaster()) {
raft.convertToLeader(); raft.convertToLeader();
} else { } else {
@ -72,9 +73,11 @@ public class RAFTExecutor extends AbstractContextContractExecutor {
} }
@Override @Override
public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) { public void execute(String requestID, ContractRequest req, ResultCallback rcb,
OnHashCallback hcb) {
LOGGER.debug(JsonUtil.toJson(req)); LOGGER.debug(JsonUtil.toJson(req));
MultiContractMeta multiContractMeta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(req.getContractID()); MultiContractMeta multiContractMeta = cmActions.getManager().multiContractRecorder
.getMultiContractMeta(req.getContractID());
if (multiContractMeta == null || !multiContractMeta.isMaster()) { if (multiContractMeta == null || !multiContractMeta.isMaster()) {
cmActions.getManager().executeContractOnOtherNodes(req, rcb); cmActions.getManager().executeContractOnOtherNodes(req, rcb);
return; return;
@ -82,8 +85,9 @@ public class RAFTExecutor extends AbstractContextContractExecutor {
ResultCallback collector; ResultCallback collector;
int count = contractCluster.getNodes().size(); int count = contractCluster.getNodes().size();
collector = networkManager.createResultCallback( collector = networkManager.createResultCallback(requestID,
requestID, new RequestAllExecutor.ResultMerger(rcb, count, req.seq, req.getContractID()), count); new RequestAllExecutor.ResultMerger(rcb, count, req.seq, req.getContractID()),
count);
masterServerTCPAction.getSync().sleep(requestID, collector); masterServerTCPAction.getSync().sleep(requestID, collector);
raft.insertLogEntry(req); raft.insertLogEntry(req);
} }

View File

@ -1,6 +1,5 @@
package org.bdware.consistency.plugin.raft; package org.bdware.consistency.plugin.raft;
import org.bdware.consistency.plugin.pbft.PBFTExecutor;
import org.bdware.sdk.consistency.api.ContractExecutorFactory; import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor; import org.bdware.server.trustedmodel.ContractExecutor;
@ -14,9 +13,9 @@ public class RAFTExecutorFactory implements ContractExecutorFactory {
@Override @Override
public ContractExecutor getInstance(Map<String, Object> args) { public ContractExecutor getInstance(Map<String, Object> args) {
// int nodeSize = (int) args.get("nodeSize"); // int nodeSize = (int) args.get("nodeSize");
String contractID = (String) args.get("contractID"); String contractID = (String) args.get("contractID");
String[] memberPubKeys = (String[]) args.get("members"); String[] memberPubKeys = (String[]) args.get("members");
return new RAFTExecutor(contractID, memberPubKeys); return new RAFTExecutor(contractID, memberPubKeys);
} }
} }

View File

@ -19,11 +19,14 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
public class RaftAlgorithm implements CommitAlgorithm { public class RaftAlgorithm implements CommitAlgorithm {
private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(RaftAlgorithm.class); private static final Logger LOGGER =
org.apache.logging.log4j.LogManager.getLogger(RaftAlgorithm.class);
private ContractCluster connection; private ContractCluster connection;
Committer committer; Committer committer;
private enum State {LEADER, FOLLOWER, CANDIDATE} private enum State {
LEADER, FOLLOWER, CANDIDATE
}
State state; State state;
PubKeyNode leader; PubKeyNode leader;
@ -52,14 +55,15 @@ public class RaftAlgorithm implements CommitAlgorithm {
TimerTask heartbeatTimerTask; TimerTask heartbeatTimerTask;
long lastActivated; long lastActivated;
//config // config
RaftConfig config; RaftConfig config;
// Locks and Conditions // Locks and Conditions
private Lock lock = new ReentrantLock(); private Lock lock = new ReentrantLock();
public RaftAlgorithm(RaftConfig config, PubKeyNode node, ContractCluster conn, Committer committer) { public RaftAlgorithm(RaftConfig config, PubKeyNode node, ContractCluster conn,
Committer committer) {
this.config = config; this.config = config;
self = node; self = node;
connection = conn; connection = conn;
@ -117,7 +121,8 @@ public class RaftAlgorithm implements CommitAlgorithm {
case RequestVote: case RequestVote:
response = onRequestVote((RequestVote) message); response = onRequestVote((RequestVote) message);
persist(); persist();
connection.sendMessage((PubKeyNode) sender, new RaftMessageParser(response).getBytes()); connection.sendMessage((PubKeyNode) sender,
new RaftMessageParser(response).getBytes());
break; break;
case RequestVoteResponse: case RequestVoteResponse:
onRequestVoteResponse((RequestVoteResponse) message); onRequestVoteResponse((RequestVoteResponse) message);
@ -125,7 +130,8 @@ public class RaftAlgorithm implements CommitAlgorithm {
case AppendEntries: case AppendEntries:
response = onAppendEntries((AppendEntries) message); response = onAppendEntries((AppendEntries) message);
persist(); persist();
connection.sendMessage((PubKeyNode) sender, new RaftMessageParser(response).getBytes()); connection.sendMessage((PubKeyNode) sender,
new RaftMessageParser(response).getBytes());
break; break;
case AppendEntriesResponse: case AppendEntriesResponse:
onAppendEntriesResponse((PubKeyNode) sender, (AppendEntriesResponse) message); onAppendEntriesResponse((PubKeyNode) sender, (AppendEntriesResponse) message);
@ -143,14 +149,16 @@ public class RaftAlgorithm implements CommitAlgorithm {
// in lock // in lock
private RequestVoteResponse onRequestVote(RequestVote request) { private RequestVoteResponse onRequestVote(RequestVote request) {
if (request.getTerm() < currentTerm) { if (request.getTerm() < currentTerm) {
LOGGER.debug("Rejected RequestVote message in term {}" + LOGGER.debug(
", because the message is stale. My term is {}.", request.getTerm(), currentTerm); "Rejected RequestVote message in term {}"
+ ", because the message is stale. My term is {}.",
request.getTerm(), currentTerm);
return new RequestVoteResponse(currentTerm, false); return new RequestVoteResponse(currentTerm, false);
} }
if (request.getTerm() == currentTerm) { if (request.getTerm() == currentTerm) {
if (votedFor != null && !votedFor.pubkey.equals(request.getCandidate().pubkey)) { if (votedFor != null && !votedFor.pubkey.equals(request.getCandidate().pubkey)) {
LOGGER.debug("Rejected RequestVote message in term {}" + LOGGER.debug("Rejected RequestVote message in term {}"
", because I have voted for another server. ", request.getTerm()); + ", because I have voted for another server. ", request.getTerm());
return new RequestVoteResponse(currentTerm, false); return new RequestVoteResponse(currentTerm, false);
} }
} else { } else {
@ -179,7 +187,8 @@ public class RaftAlgorithm implements CommitAlgorithm {
leader = null; leader = null;
votedFor = null; votedFor = null;
convertToFollower(); convertToFollower();
} else if (state == State.CANDIDATE && response.getTerm() == currentTerm && response.isGranted()) { } else if (state == State.CANDIDATE && response.getTerm() == currentTerm
&& response.isGranted()) {
++grantedVoteCount; ++grantedVoteCount;
if (grantedVoteCount > servers.size() / 2) { if (grantedVoteCount > servers.size() / 2) {
convertToLeader(); convertToLeader();
@ -192,8 +201,10 @@ public class RaftAlgorithm implements CommitAlgorithm {
private AppendEntriesResponse onAppendEntries(AppendEntries request) { private AppendEntriesResponse onAppendEntries(AppendEntries request) {
LogEntry lastEntry = logManager.getLastEntry(); LogEntry lastEntry = logManager.getLastEntry();
if (request.getTerm() < currentTerm) { if (request.getTerm() < currentTerm) {
LOGGER.debug("Rejected AppendEntries message in term {}" + LOGGER.debug(
", because the message is stale. My term is {}.", request.getTerm(), currentTerm); "Rejected AppendEntries message in term {}"
+ ", because the message is stale. My term is {}.",
request.getTerm(), currentTerm);
return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), false, self); return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), false, self);
} }
if (request.getTerm() > currentTerm) { if (request.getTerm() > currentTerm) {
@ -207,8 +218,9 @@ public class RaftAlgorithm implements CommitAlgorithm {
LOGGER.info("new leader={}", leader.pubkey); LOGGER.info("new leader={}", leader.pubkey);
} }
if (!leader.pubkey.equals(request.getLeader().pubkey)) { if (!leader.pubkey.equals(request.getLeader().pubkey)) {
LOGGER.warn("Another peer={} declares that it is the leader " + LOGGER.warn(
"at term={} which was occupied by leader={}", "Another peer={} declares that it is the leader "
+ "at term={} which was occupied by leader={}",
request.getLeader().pubkey, request.getTerm(), leader.pubkey); request.getLeader().pubkey, request.getTerm(), leader.pubkey);
return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), false, self); return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), false, self);
} }
@ -218,7 +230,8 @@ public class RaftAlgorithm implements CommitAlgorithm {
if (entry == null) { if (entry == null) {
return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), false, self); return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), false, self);
} }
if (entry.getTerm() != request.getTerm() || logManager.getLastEntryIndex() > request.getPrevLogIndex()) { if (entry.getTerm() != request.getTerm()
|| logManager.getLastEntryIndex() > request.getPrevLogIndex()) {
logManager.deleteEntriesStartAt(request.getPrevLogIndex()); logManager.deleteEntriesStartAt(request.getPrevLogIndex());
lastEntry = logManager.getLastEntry(); lastEntry = logManager.getLastEntry();
} }
@ -229,9 +242,11 @@ public class RaftAlgorithm implements CommitAlgorithm {
logManager.append(newEntries); logManager.append(newEntries);
lastEntry = logManager.getLastEntry(); lastEntry = logManager.getLastEntry();
if (request.getLeaderCommit() > commitIndex) { if (request.getLeaderCommit() > commitIndex) {
commitIndex = Math.min(request.getLeaderCommit(), newEntries.get(newEntries.size() - 1).getIndex()); commitIndex = Math.min(request.getLeaderCommit(),
newEntries.get(newEntries.size() - 1).getIndex());
} }
LOGGER.info("RECEIVE APPENDENTRY: commitIndex=" + commitIndex + " lastApplied:" + lastApplied); LOGGER.info(
"RECEIVE APPENDENTRY: commitIndex=" + commitIndex + " lastApplied:" + lastApplied);
while (commitIndex > lastApplied) { while (commitIndex > lastApplied) {
applyToStateMachine(lastApplied + 1); applyToStateMachine(lastApplied + 1);
lastApplied++; lastApplied++;
@ -248,9 +263,11 @@ public class RaftAlgorithm implements CommitAlgorithm {
convertToFollower(); convertToFollower();
} else if (state == State.LEADER && response.getTerm() == currentTerm) { } else if (state == State.LEADER && response.getTerm() == currentTerm) {
if (response.isSuccess()) { if (response.isSuccess()) {
if (matchIndex.containsKey(sender.pubkey) && matchIndex.get(sender.pubkey) > response.getLastLogIndex()) { if (matchIndex.containsKey(sender.pubkey)
&& matchIndex.get(sender.pubkey) > response.getLastLogIndex()) {
LOGGER.warn("node={} matchIndex decrease! prev matchIndex={}, lastLogIndex={}", LOGGER.warn("node={} matchIndex decrease! prev matchIndex={}, lastLogIndex={}",
sender.pubkey, matchIndex.get(sender.pubkey), response.getLastLogIndex()); sender.pubkey, matchIndex.get(sender.pubkey),
response.getLastLogIndex());
throw new RuntimeException("matchIndex error"); throw new RuntimeException("matchIndex error");
} }
matchIndex.put(sender.pubkey, response.getLastLogIndex()); matchIndex.put(sender.pubkey, response.getLastLogIndex());
@ -286,7 +303,8 @@ public class RaftAlgorithm implements CommitAlgorithm {
if (lastLogIndex >= index) { if (lastLogIndex >= index) {
sendAppendEntries(node, index); sendAppendEntries(node, index);
} else { } else {
AppendEntries req = new AppendEntries(currentTerm, self, lastEntry.getIndex(), lastEntry.getTerm(), commitIndex, null); AppendEntries req = new AppendEntries(currentTerm, self, lastEntry.getIndex(),
lastEntry.getTerm(), commitIndex, null);
connection.sendMessage(node, new RaftMessageParser(req).getBytes()); connection.sendMessage(node, new RaftMessageParser(req).getBytes());
} }
}); });
@ -310,7 +328,8 @@ public class RaftAlgorithm implements CommitAlgorithm {
private void sendAppendEntries(PubKeyNode follower, long logIndex) { private void sendAppendEntries(PubKeyNode follower, long logIndex) {
LogEntry prevLog = logManager.getEntry(logIndex - 1); LogEntry prevLog = logManager.getEntry(logIndex - 1);
List<LogEntry> entries = logManager.getEntriesStartAt(logIndex); List<LogEntry> entries = logManager.getEntriesStartAt(logIndex);
AppendEntries request = new AppendEntries(currentTerm, leader, prevLog.getIndex(), prevLog.getTerm(), commitIndex, entries); AppendEntries request = new AppendEntries(currentTerm, leader, prevLog.getIndex(),
prevLog.getTerm(), commitIndex, entries);
connection.sendMessage(follower, new RaftMessageParser(request).getBytes()); connection.sendMessage(follower, new RaftMessageParser(request).getBytes());
} }
@ -342,8 +361,9 @@ public class RaftAlgorithm implements CommitAlgorithm {
else else
nextIndex.put(server.pubkey, lastEntry.getIndex() + 1); nextIndex.put(server.pubkey, lastEntry.getIndex() + 1);
matchIndex.put(server.pubkey, 0L); matchIndex.put(server.pubkey, 0L);
// AppendEntries req = new AppendEntries(currentTerm, self, lastEntry.getIndex(), lastEntry.getTerm(), commitIndex, null); // AppendEntries req = new AppendEntries(currentTerm, self, lastEntry.getIndex(),
// connection.sendMessage(server,new RaftMessageParser(req).getBytes()); // lastEntry.getTerm(), commitIndex, null);
// connection.sendMessage(server,new RaftMessageParser(req).getBytes());
} }
} }
} }
@ -379,7 +399,8 @@ public class RaftAlgorithm implements CommitAlgorithm {
private boolean isServerValid(PubKeyNode server) { private boolean isServerValid(PubKeyNode server) {
if (server == null) return false; if (server == null)
return false;
List<PubKeyNode> nodes = connection.getNodes(); List<PubKeyNode> nodes = connection.getNodes();
for (PubKeyNode node : nodes) { for (PubKeyNode node : nodes) {
if (node.pubkey.equals(server.pubkey)) { if (node.pubkey.equals(server.pubkey)) {

View File

@ -23,7 +23,8 @@ public class AppendEntries implements IRaftMessage {
this.type = RaftMessageType.AppendEntries; this.type = RaftMessageType.AppendEntries;
} }
public AppendEntries(long term, PubKeyNode leader, long prevLogIndex, long prevLogTerm, long leaderCommit, List<LogEntry> entries) { public AppendEntries(long term, PubKeyNode leader, long prevLogIndex, long prevLogTerm,
long leaderCommit, List<LogEntry> entries) {
this.type = RaftMessageType.AppendEntries; this.type = RaftMessageType.AppendEntries;
this.term = term; this.term = term;
this.leader = leader; this.leader = leader;

View File

@ -1,10 +1,10 @@
package org.bdware.consistency.plugin.raft.algo.message; package org.bdware.consistency.plugin.raft.algo.message;
public class EmptyMessage implements IRaftMessage{ public class EmptyMessage implements IRaftMessage {
RaftMessageType type; RaftMessageType type;
public EmptyMessage() { public EmptyMessage() {
type=RaftMessageType.Unknown; type = RaftMessageType.Unknown;
} }
@Override @Override

View File

@ -1,11 +1,7 @@
package org.bdware.consistency.plugin.raft.algo.message; package org.bdware.consistency.plugin.raft.algo.message;
public enum RaftMessageType { public enum RaftMessageType {
RequestVote(0), RequestVote(0), RequestVoteResponse(1), AppendEntries(2), AppendEntriesResponse(3), Unknown(4);
RequestVoteResponse(1),
AppendEntries(2),
AppendEntriesResponse(3),
Unknown(4);
private final int type; private final int type;

View File

@ -67,7 +67,8 @@ public class LogEntry {
int dataLen = ByteUtil.readInt(bi); int dataLen = ByteUtil.readInt(bi);
if (dataLen > 0) { if (dataLen > 0) {
byte[] dataByte = ByteUtil.readBytes(bi, dataLen); byte[] dataByte = ByteUtil.readBytes(bi, dataLen);
entry.command = JsonUtil.fromJson(new String(dataByte, StandardCharsets.UTF_8), ContractRequest.class); entry.command = JsonUtil.fromJson(new String(dataByte, StandardCharsets.UTF_8),
ContractRequest.class);
} else { } else {
entry.command = null; entry.command = null;
} }

View File

@ -11,10 +11,14 @@ import java.util.*;
public class LogManager { public class LogManager {
private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(LogManager.class); private static final Logger LOGGER =
org.apache.logging.log4j.LogManager.getLogger(LogManager.class);
private final String logDir; private final String logDir;
private final TreeMap<Long, String> segmentStartIndexMap = new TreeMap<>(); // key is start index, value is file name of the segment private final TreeMap<Long, String> segmentStartIndexMap = new TreeMap<>(); // key is start
// index, value is
// file name of the
// segment
private int maxSegmentFileSize; private int maxSegmentFileSize;
private Segment lastSegment; private Segment lastSegment;
@ -62,7 +66,8 @@ public class LogManager {
if (index >= lastSegment.getStartIndex() && index <= lastSegment.getEndIndex()) { if (index >= lastSegment.getStartIndex() && index <= lastSegment.getEndIndex()) {
return lastSegment.getEntry(index); return lastSegment.getEntry(index);
} }
Segment segment = Segment.loadSegment(logDir, segmentStartIndexMap.floorEntry(index).getValue()); Segment segment =
Segment.loadSegment(logDir, segmentStartIndexMap.floorEntry(index).getValue());
return segment.getEntry(index); return segment.getEntry(index);
} }
@ -70,16 +75,17 @@ public class LogManager {
long newLastLogIndex = this.getLastEntryIndex(); long newLastLogIndex = this.getLastEntryIndex();
newLastLogIndex++; newLastLogIndex++;
byte[] entryBytes = entry.getBytes(); byte[] entryBytes = entry.getBytes();
LOGGER.info("AppendLogEntry: last=" + this.getLastEntryIndex() + " -> toAdd=" + entry.getIndex()); LOGGER.info("AppendLogEntry: last=" + this.getLastEntryIndex() + " -> toAdd="
+ entry.getIndex());
try { try {
if (segmentStartIndexMap.isEmpty() || lastSegment == null || if (segmentStartIndexMap.isEmpty() || lastSegment == null
lastSegment.getFileSize() + entryBytes.length > maxSegmentFileSize) { || lastSegment.getFileSize() + entryBytes.length > maxSegmentFileSize) {
// close file and rename // close file and rename
if (lastSegment != null) { if (lastSegment != null) {
FileUtil.closeFile(lastSegment.getRandomAccessFile()); FileUtil.closeFile(lastSegment.getRandomAccessFile());
String newFileName = String.format("%016x-%016x", String newFileName = String.format("%016x-%016x", lastSegment.getStartIndex(),
lastSegment.getStartIndex(), lastSegment.getEndIndex()); lastSegment.getEndIndex());
String newFullFileName = logDir + File.separator + newFileName; String newFullFileName = logDir + File.separator + newFileName;
File newFile = new File(newFullFileName); File newFile = new File(newFullFileName);
String oldFullFileName = logDir + File.separator + lastSegment.getFileName(); String oldFullFileName = logDir + File.separator + lastSegment.getFileName();
@ -160,7 +166,8 @@ public class LogManager {
} }
private void fillMap(File dir) { private void fillMap(File dir) {
if (dir == null) return; if (dir == null)
return;
File[] files = dir.listFiles(); File[] files = dir.listFiles();
if (files == null) { if (files == null) {
LOGGER.warn("get file in dir({}) error", dir.getName()); LOGGER.warn("get file in dir({}) error", dir.getName());

View File

@ -1,18 +1,17 @@
package org.bdware.consistency.plugin.raft.algo.storage; package org.bdware.consistency.plugin.raft.algo.storage;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.raft.algo.util.FileUtil; import org.bdware.consistency.plugin.raft.algo.util.FileUtil;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
public class Segment { public class Segment {
private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(Segment.class); private static final Logger LOGGER =
org.apache.logging.log4j.LogManager.getLogger(Segment.class);
private long startIndex; private long startIndex;
private long endIndex; private long endIndex;
private long fileSize; private long fileSize;

View File

@ -7,6 +7,7 @@ public class StateManager {
String filename; String filename;
static long currentTerm; static long currentTerm;
static PubKeyNode node; static PubKeyNode node;
public static long loadCurrentTerm() { public static long loadCurrentTerm() {
return currentTerm; return currentTerm;
} }

View File

@ -2,20 +2,18 @@ package org.bdware.consistency.plugin.raft.algo.util;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.raft.algo.message.IRaftMessage;
import org.bdware.consistency.plugin.raft.algo.storage.LogEntry; import org.bdware.consistency.plugin.raft.algo.storage.LogEntry;
import org.bdware.consistency.plugin.raft.algo.storage.Segment;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.lang.reflect.Method;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.zip.CRC32; import java.util.zip.CRC32;
public class FileUtil { public class FileUtil {
private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(FileUtil.class); private static final Logger LOGGER =
org.apache.logging.log4j.LogManager.getLogger(FileUtil.class);
public static RandomAccessFile createFile(String dir, String fileName, String mode) { public static RandomAccessFile createFile(String dir, String fileName, String mode) {
try { try {

View File

@ -39,9 +39,8 @@ public class HeartBeatUtil {
public synchronized void schedule(TimerTask timerTask, int delay, int period) { public synchronized void schedule(TimerTask timerTask, int delay, int period) {
try { try {
if (!recordedFuture.containsKey(timerTask)) { if (!recordedFuture.containsKey(timerTask)) {
ScheduledFuture<?> future = ScheduledFuture<?> future = ContractManager.scheduledThreadPool
ContractManager.scheduledThreadPool.scheduleWithFixedDelay( .scheduleWithFixedDelay(timerTask, delay, period, TimeUnit.MILLISECONDS);
timerTask, delay, period, TimeUnit.MILLISECONDS);
recordedFuture.put(timerTask, future); recordedFuture.put(timerTask, future);
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -24,30 +24,31 @@ public class RequestOnceExecutor extends AbstractContextContractExecutor {
} }
@Override @Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { public void execute(String requestID, ContractRequest req, ResultCallback rc,
ResultCallback cb = OnHashCallback hcb) {
new ResultCallback() { ResultCallback cb = new ResultCallback() {
@Override @Override
public void onResult(String str) { public void onResult(String str) {
LOGGER.debug(str); LOGGER.debug(str);
JsonObject jo = JsonParser.parseString(str).getAsJsonObject(); JsonObject jo = JsonParser.parseString(str).getAsJsonObject();
JsonObject result = JsonObject result =
JsonParser.parseString(jo.get("data").getAsString()) JsonParser.parseString(jo.get("data").getAsString()).getAsJsonObject();
.getAsJsonObject(); for (String key : result.keySet())
for (String key : result.keySet()) jo.add(key, result.get(key)); jo.add(key, result.get(key));
jo.remove("action"); jo.remove("action");
jo.addProperty("action", "onExecuteResult"); jo.addProperty("action", "onExecuteResult");
LOGGER.debug(jo.toString()); LOGGER.debug(jo.toString());
rc.onResult(jo.toString()); rc.onResult(jo.toString());
} }
}; };
masterServerTCPAction.getSync().sleep(requestID, cb); masterServerTCPAction.getSync().sleep(requestID, cb);
String[] members = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers(); String[] members = cmActions.getManager().multiContractRecorder
.getMultiContractMeta(contractID).getMembers();
for (int i = 0; i < members.length; i++) { for (int i = 0; i < members.length; i++) {
LOGGER.info("[members]:" + members.length); LOGGER.info("[members]:" + members.length);
int size = members.length; int size = members.length;
String nodeID = members[order.incrementAndGet() % size]; String nodeID = members[order.incrementAndGet() % size];
//ADD Connect // ADD Connect
Map<String, Object> obj = new HashMap<>(); Map<String, Object> obj = new HashMap<>();
obj.put("action", "executeContractLocally"); obj.put("action", "executeContractLocally");
obj.put("requestID", requestID); obj.put("requestID", requestID);
@ -56,8 +57,7 @@ public class RequestOnceExecutor extends AbstractContextContractExecutor {
networkManager.sendToAgent(nodeID, JsonUtil.toJson(obj)); networkManager.sendToAgent(nodeID, JsonUtil.toJson(obj));
return; return;
} }
rc.onResult( rc.onResult("{\"status\":\"Error\",\"result\":\"all nodes "
"{\"status\":\"Error\",\"result\":\"all nodes " + " offline\",\"action\":\"onExecuteContract\"}");
+ " offline\",\"action\":\"onExecuteContract\"}");
} }
} }

View File

@ -25,38 +25,39 @@ public class ResponseOnceExecutor extends AbstractContextContractExecutor {
} }
@Override @Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { public void execute(String requestID, ContractRequest req, ResultCallback rc,
OnHashCallback hcb) {
executeInternal(requestID, rc, req, 2); executeInternal(requestID, rc, req, 2);
} }
private void executeInternal( private void executeInternal(String requestID, ResultCallback rc, ContractRequest req,
String requestID, ResultCallback rc, ContractRequest req, int count) { int count) {
// String contractID = req.getContractID(); // String contractID = req.getContractID();
// TODO 标注失效节点是否选择重新迁移 // TODO 标注失效节点是否选择重新迁移
ResultCallback cb = ResultCallback cb = new ResultCallback() {
new ResultCallback() { @Override
@Override public void onResult(String str) {
public void onResult(String str) { LOGGER.debug(str);
LOGGER.debug(str); JsonObject jo = JsonParser.parseString(str).getAsJsonObject();
JsonObject jo = JsonParser.parseString(str).getAsJsonObject(); jo.remove("action");
jo.remove("action"); jo.addProperty("action", "onExecuteResult");
jo.addProperty("action", "onExecuteResult"); LOGGER.debug(jo.toString());
LOGGER.debug(jo.toString()); if (jo.has("data")) {
if (jo.has("data")) { String data = jo.get("data").getAsString();
String data = jo.get("data").getAsString(); ContractResult cr = JsonUtil.fromJson(data, ContractResult.class);
ContractResult cr = JsonUtil.fromJson(data, ContractResult.class); if (cr.status != ContractResult.Status.Success && count > 0) {
if (cr.status != ContractResult.Status.Success && count > 0) { executeInternal(requestID, rc, req, count - 1);
executeInternal(requestID, rc, req, count - 1); } else
} else rc.onResult(jo.toString()); rc.onResult(jo.toString());
} else { } else {
JsonObject jo2 = new JsonObject(); JsonObject jo2 = new JsonObject();
jo2.addProperty("action", "onExecuteResult"); jo2.addProperty("action", "onExecuteResult");
jo.remove("action"); jo.remove("action");
jo2.addProperty("data", jo.toString()); jo2.addProperty("data", jo.toString());
rc.onResult(jo2.toString()); rc.onResult(jo2.toString());
} }
} }
}; };
masterServerTCPAction.getSync().sleepWithTimeout(requestID, cb, 5); masterServerTCPAction.getSync().sleepWithTimeout(requestID, cb, 5);
if (!sendOnce(requestID, req)) if (!sendOnce(requestID, req))
rc.onResult( rc.onResult(
@ -64,7 +65,8 @@ public class ResponseOnceExecutor extends AbstractContextContractExecutor {
} }
private boolean sendOnce(String requestID, ContractRequest req) { private boolean sendOnce(String requestID, ContractRequest req) {
String[] members = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).getMembers(); String[] members = cmActions.getManager().multiContractRecorder
.getMultiContractMeta(contractID).getMembers();
for (int i = 0; i < members.length; i++) { for (int i = 0; i < members.length; i++) {
int size = members.length; int size = members.length;
String nodeID = members[order.incrementAndGet() % size]; String nodeID = members[order.incrementAndGet() % size];

View File

@ -53,9 +53,14 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
request_index = new AtomicInteger(seq); request_index = new AtomicInteger(seq);
} }
public ResultCallback createResultCallback(final String requestID, final ResultCallback originalCb, final int count, final int request_seq, final String contractID, JoinInfo joinInfo) { public ResultCallback createResultCallback(final String requestID,
final ResultCallback originalCb, final int count, final int request_seq,
final String contractID, JoinInfo joinInfo) {
// TODO 加对应的超时 // TODO 加对应的超时
return networkManager.createResultCallback(requestID, new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq, contractID, joinInfo), count); // 把count改成了1设置成获得1个响应就行 return networkManager.createResultCallback(requestID,
new MultiPointCooperationExecutor.ResultMerger(originalCb, count, request_seq,
contractID, joinInfo),
count); // 把count改成了1设置成获得1个响应就行
} }
public void sendRequest(String id, ContractRequest req, String[] nodes, ResultCallback rc) { public void sendRequest(String id, ContractRequest req, String[] nodes, ResultCallback rc) {
@ -78,56 +83,65 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
if (node.equals(globalConf.getNodeID())) { if (node.equals(globalConf.getNodeID())) {
masterClientTCPAction.asyncExecuteContractLocally(jo, rc); masterClientTCPAction.asyncExecuteContractLocally(jo, rc);
} else { } else {
// LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5) + " not null " + "RequestAllExecutor 发送请求给 " + node.substring(0, 5)); // LOGGER.info("[sendRequests] get cmNode " + node.substring(0, 5) + " not null " +
// "RequestAllExecutor 发送请求给 " + node.substring(0, 5));
networkManager.sendToAgent(node, sendStr); networkManager.sendToAgent(node, sendStr);
} }
} }
} }
public static <T> T invokeFunctionWithoutLimit(String contractID, String funcName, Class<T> t, JsonElement... args) { public static <T> T invokeFunctionWithoutLimit(String contractID, String funcName, Class<T> t,
JsonElement... args) {
ContractClient client = cmActions.getManager().getClient(contractID); ContractClient client = cmActions.getManager().getClient(contractID);
JsonObject arg = new JsonObject(); JsonObject arg = new JsonObject();
arg.addProperty("funcName", funcName); arg.addProperty("funcName", funcName);
JsonArray funcArgs = new JsonArray(); JsonArray funcArgs = new JsonArray();
if (args != null && args.length > 0) for (JsonElement je : args) if (args != null && args.length > 0)
funcArgs.add(je); for (JsonElement je : args)
funcArgs.add(je);
arg.add("funcArgs", funcArgs); arg.add("funcArgs", funcArgs);
String routeResultStr = client.executeMethod("", "invokeFunctionWithoutLimit", arg.toString()); String routeResultStr =
client.executeMethod("", "invokeFunctionWithoutLimit", arg.toString());
T routeResult = JsonUtil.fromJson(routeResultStr, t); T routeResult = JsonUtil.fromJson(routeResultStr, t);
return routeResult; return routeResult;
} }
private String[] getAccordingToRouteInfo(RouteInfo routeInfo, ContractRequest req, String[] members) { private String[] getAccordingToRouteInfo(RouteInfo routeInfo, ContractRequest req,
String[] members) {
try { try {
int val; int val;
if (routeInfo.useDefault == null) { if (routeInfo.useDefault == null) {
// func myFunc (sourceArg, requester) // func myFunc (sourceArg, requester)
JsonElement requester = req.getRequester() == null ? JsonNull.INSTANCE : new JsonPrimitive(req.getRequester()); JsonElement requester = req.getRequester() == null ? JsonNull.INSTANCE
return invokeFunctionWithoutLimit(req.getContractID(), routeInfo.funcName, String[].class, req.getArg(), requester); : new JsonPrimitive(req.getRequester());
return invokeFunctionWithoutLimit(req.getContractID(), routeInfo.funcName,
String[].class, req.getArg(), requester);
} }
switch (routeInfo.useDefault) { switch (routeInfo.useDefault) {
case byRequester: case byRequester:
val = new BigInteger(req.getRequester(), 16).mod(new BigInteger("" + members.length)).intValue(); val = new BigInteger(req.getRequester(), 16)
.mod(new BigInteger("" + members.length)).intValue();
while (val < 0 && members.length > 0) { while (val < 0 && members.length > 0) {
val = val + members.length; val = val + members.length;
} }
return new String[]{members[val]}; return new String[] {members[val]};
case byArgHash: case byArgHash:
val = req.getArg().hashCode(); val = req.getArg().hashCode();
val = val % members.length; val = val % members.length;
while (val < 0 && members.length > 0) { while (val < 0 && members.length > 0) {
val += members.length; val += members.length;
} }
return new String[]{members[val]}; return new String[] {members[val]};
case byJsonPropHash: case byJsonPropHash:
JsonElement jo = tryLoadJsonProp(req, routeInfo.param); JsonElement jo = tryLoadJsonProp(req, routeInfo.param);
val = jo.toString().hashCode() % members.length; val = jo.toString().hashCode() % members.length;
while (val < 0 && members.length > 0) { while (val < 0 && members.length > 0) {
val += members.length; val += members.length;
} }
return new String[]{members[val]}; return new String[] {members[val]};
case byShardingIDTree: case byShardingIDTree:
return byShardingIDTree(members, meta.contract.shardingId, Integer.valueOf(routeInfo.param)); return byShardingIDTree(members, meta.contract.shardingId,
Integer.valueOf(routeInfo.param));
default: default:
return members; return members;
} }
@ -146,11 +160,12 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
System.arraycopy(members, from, ret, 0, to - from); System.arraycopy(members, from, ret, 0, to - from);
return ret; return ret;
} else } else
return new String[]{}; return new String[] {};
} else { } else {
childCount = Math.abs(childCount); childCount = Math.abs(childCount);
if (shardingId > 0) shardingId = (shardingId - 1) / childCount; if (shardingId > 0)
return new String[]{members[shardingId]}; shardingId = (shardingId - 1) / childCount;
return new String[] {members[shardingId]};
} }
} }
@ -161,7 +176,8 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
JsonObject arg; JsonObject arg;
if (req.getArg().isJsonPrimitive()) { if (req.getArg().isJsonPrimitive()) {
arg = JsonUtil.parseString(req.getArg().getAsString()).getAsJsonObject(); arg = JsonUtil.parseString(req.getArg().getAsString()).getAsJsonObject();
} else arg = req.getArg().getAsJsonObject(); } else
arg = req.getArg().getAsJsonObject();
if (!param.contains(".")) { if (!param.contains(".")) {
return arg.get(param); return arg.get(param);
} else { } else {
@ -183,21 +199,23 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
int validNode = 0; int validNode = 0;
for (String node : nodes) { for (String node : nodes) {
if (networkManager.hasAgentConnection(node)) { if (networkManager.hasAgentConnection(node)) {
//&& masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID) // && masterServerRecoverMechAction.getRecoverStatusMap().get(node).get(contractID)
// == RecoverFlag.Fine // == RecoverFlag.Fine
validNode++; validNode++;
} }
} }
int c = resultCount; int c = resultCount;
if (type == ContractExecType.Sharding) c = (int) Math.ceil((double) c / 2); if (type == ContractExecType.Sharding)
c = (int) Math.ceil((double) c / 2);
return validNode >= c; return validNode >= c;
} }
@Override @Override
public void execute(String requestID, ContractRequest req, ResultCallback rc, OnHashCallback hcb) { public void execute(String requestID, ContractRequest req, ResultCallback rc,
//LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req)); OnHashCallback hcb) {
// LOGGER.info("[MultiPointCooperationExecutor] execute " + JsonUtil.toJson(req));
// 获得action 函数名 // 获得action 函数名
// LOGGER.info("action is : " + req.getAction()); // LOGGER.info("action is : " + req.getAction());
req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID())); req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID()));
if (requestID != null && requestID.endsWith("_mul")) { if (requestID != null && requestID.endsWith("_mul")) {
synchronized (lock) { synchronized (lock) {
@ -212,15 +230,17 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
req.seq = request_index.getAndIncrement(); req.seq = request_index.getAndIncrement();
} }
req.needSeq = true; req.needSeq = true;
String id = System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq; String id =
//LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id); System.currentTimeMillis() + "_" + (int) (Math.random() * 1000000) + "_" + req.seq;
// LOGGER.info("execute receive requestID= " + requestID + " msgID=" + id);
// 校验成功 current node num 合法 // 校验成功 current node num 合法
//LOGGER.info("checkCurNodeNumValid true"); // LOGGER.info("checkCurNodeNumValid true");
ContractMeta meta = cmActions.getManager().statusRecorder.getContractMeta(req.getContractID()); ContractMeta meta =
cmActions.getManager().statusRecorder.getContractMeta(req.getContractID());
FunctionDesp fun = meta.getExportedFunction(req.getAction()); FunctionDesp fun = meta.getExportedFunction(req.getAction());
ResultCallback collector; ResultCallback collector;
//Count 根据join规则来 // Count 根据join规则来
//nodes 根据route规则来 // nodes 根据route规则来
JoinInfo joinInfo = fun.joinInfo; JoinInfo joinInfo = fun.joinInfo;
RouteInfo routeInfo = fun.routeInfo; RouteInfo routeInfo = fun.routeInfo;
int count = getJoinCount(joinInfo, contractID, req); int count = getJoinCount(joinInfo, contractID, req);
@ -230,28 +250,35 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
if (nodes.length < count) { if (nodes.length < count) {
count = nodes.length; count = nodes.length;
} }
//LOGGER.info("requestID=" + requestID + " join Count: " + count + " nodeCount:" + nodes.length); // LOGGER.info("requestID=" + requestID + " join Count: " + count + " nodeCount:" +
// nodes.length);
if (count > 0) { if (count > 0) {
collector = createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo); collector = createResultCallback(id, rc, count, req.seq, req.getContractID(), joinInfo);
// 初始化结果收集器 // 初始化结果收集器
masterServerTCPAction.getSync().sleep(id, collector); masterServerTCPAction.getSync().sleep(id, collector);
// LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq); // LOGGER.info("requestID=" + requestID + " master broadcasts request " + req.seq);
sendRequest(id, req, nodes, collector); // 发送请求 sendRequest(id, req, nodes, collector); // 发送请求
} else { } else {
ContractResult cr = new ContractResult(ContractResult.Status.Exception, new JsonPrimitive("broadcast 0")); ContractResult cr = new ContractResult(ContractResult.Status.Exception,
new JsonPrimitive("broadcast 0"));
rc.onResult(JsonUtil.toJson(cr)); rc.onResult(JsonUtil.toJson(cr));
} }
} }
private int getJoinCount(JoinInfo joinInfo, String contractID, ContractRequest contractRequest) { private int getJoinCount(JoinInfo joinInfo, String contractID,
ContractRequest contractRequest) {
try { try {
if (joinInfo == null) return resultCount; if (joinInfo == null)
return resultCount;
if (joinInfo.joinCountFuncName != null) { if (joinInfo.joinCountFuncName != null) {
Integer count = invokeFunctionWithoutLimit(contractID, joinInfo.joinCountFuncName, Integer.class, new JsonPrimitive(contractRequest.getRequester()), contractRequest.getArg()); Integer count = invokeFunctionWithoutLimit(contractID, joinInfo.joinCountFuncName,
Integer.class, new JsonPrimitive(contractRequest.getRequester()),
contractRequest.getArg());
return count; return count;
} }
if (joinInfo.joinCount != 0) return joinInfo.joinCount; if (joinInfo.joinCount != 0)
return joinInfo.joinCount;
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -281,7 +308,8 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点 Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
JoinInfo joinInfo; JoinInfo joinInfo;
ResultMerger(final ResultCallback originalCb, final int count, final int request_seq, final String contractID, final JoinInfo joinInfo) { ResultMerger(final ResultCallback originalCb, final int count, final int request_seq,
final String contractID, final JoinInfo joinInfo) {
originalCallback = originalCb; originalCallback = originalCb;
this.count = count; this.count = count;
this.request_seq = request_seq; this.request_seq = request_seq;
@ -292,7 +320,8 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
} }
public String getInfo() { public String getInfo() {
return "contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + " order=" + order + " count=" + count + " "; return "contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + " order=" + order
+ " count=" + count + " ";
} }
@Override @Override
@ -301,13 +330,13 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
// str的data是个ContractResult // str的data是个ContractResult
// 在这儿也是返回个ContractResult // 在这儿也是返回个ContractResult
try { try {
//LOGGER.info(str); // LOGGER.info(str);
JsonObject obj = JsonParser.parseString(str).getAsJsonObject(); JsonObject obj = JsonParser.parseString(str).getAsJsonObject();
String id; String id;
if (obj.get("nodeID") == null) { if (obj.get("nodeID") == null) {
id = "null"; id = "null";
// LOGGER.info("已经收到第" + order + "节点的结果无节点ID 该结果被忽略" + order); // LOGGER.info("已经收到第" + order + "节点的结果无节点ID 该结果被忽略" + order);
return; return;
} else } else
id = obj.get("nodeID").getAsString(); id = obj.get("nodeID").getAsString();
@ -316,7 +345,8 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
return; return;
} }
nodeIDs.add(id); nodeIDs.add(id);
//LOGGER.info("contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + str + " order=" + order + " count=" + count); // LOGGER.info("contractID=" + contractID + " 收到第 " + order + " 个节点回复 : " + str + "
// order=" + order + " count=" + count);
componedContractResult.add(obj); componedContractResult.add(obj);
// 收集到所有结果 // 收集到所有结果
if (order.incrementAndGet() == count) { if (order.incrementAndGet() == count) {
@ -324,26 +354,28 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
finalResult.needSeq = true; finalResult.needSeq = true;
finalResult.seq = request_seq; finalResult.seq = request_seq;
// if (null == finalResult) { // if (null == finalResult) {
// finalResult = // finalResult =
// new ContractResult( // new ContractResult(
// ContractResult.Status.Exception, // ContractResult.Status.Exception,
// new JsonPrimitive( // new JsonPrimitive(
// "no nore than half of the // "no nore than half of the
// consistent result")); // consistent result"));
// originalCallback.onResult(new // originalCallback.onResult(new
// Gson().toJson(finalResult)); // Gson().toJson(finalResult));
// } else { // } else {
if (joinInfo != null) { if (joinInfo != null) {
handleJoinInfo(finalResult, joinInfo); handleJoinInfo(finalResult, joinInfo);
} }
originalCallback.onResult(JsonUtil.toJson(finalResult)); originalCallback.onResult(JsonUtil.toJson(finalResult));
// } // }
//LOGGER.info("本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " + finalResult.result); // LOGGER.info("本次执行最终结果为 " + finalResult.size + "个节点合并的,结果为 " +
// finalResult.result);
// 集群中事务序号+1 // 集群中事务序号+1
// MasterServerTCPAction.contractID2Members.get(contractID).nextSeq(); // MasterServerTCPAction.contractID2Members.get(contractID).nextSeq();
cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID).nextSeqAtMaster(); cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID)
.nextSeqAtMaster();
// recover其中无状态合约CP出错无需恢复 // recover其中无状态合约CP出错无需恢复
Set<String> nodesID = componedContractResult.getProblemNodes(); Set<String> nodesID = componedContractResult.getProblemNodes();
if (null == nodesID || nodesID.isEmpty()) { if (null == nodesID || nodesID.isEmpty()) {
@ -351,21 +383,25 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
} }
for (String nodeID : nodesID) { for (String nodeID : nodesID) {
LOGGER.info("结果出现问题的节点有:" + nodeID); LOGGER.info("结果出现问题的节点有:" + nodeID);
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) == RecoverFlag.Fine) { if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID)
masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).put(contractID, RecoverFlag.ToRecover); .get(contractID) == RecoverFlag.Fine) {
masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID)
.put(contractID, RecoverFlag.ToRecover);
} }
} }
for (String nodeID : nodesID) { for (String nodeID : nodesID) {
if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID).get(contractID) == RecoverFlag.ToRecover) { if (masterServerRecoverMechAction.getRecoverStatusMap().get(nodeID)
.get(contractID) == RecoverFlag.ToRecover) {
LOGGER.info("问题节点开始恢复:" + nodeID); LOGGER.info("问题节点开始恢复:" + nodeID);
// 因为该节点结果有误所以即时是stableMode也认为trans记录不可信 // 因为该节点结果有误所以即时是stableMode也认为trans记录不可信
// 直接通过load别的节点来恢复 // 直接通过load别的节点来恢复
masterServerRecoverMechAction.restartContractFromCommonMode(nodeID, contractID); masterServerRecoverMechAction.restartContractFromCommonMode(nodeID,
contractID);
} }
} }
} }
// clearCache(); // clearCache();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
LOGGER.info("本次执行最终结果为有异常"); LOGGER.info("本次执行最终结果为有异常");
@ -378,7 +414,9 @@ public class MultiPointCooperationExecutor extends AbstractContextContractExecut
if (joinInfo != null) { if (joinInfo != null) {
if (joinInfo.useDefault == null) { if (joinInfo.useDefault == null) {
if (joinInfo.joinFuncName != null) { if (joinInfo.joinFuncName != null) {
JsonElement ret = MultiPointCooperationExecutor.invokeFunctionWithoutLimit(contractID, joinInfo.joinFuncName, JsonElement.class, jo); JsonElement ret = MultiPointCooperationExecutor
.invokeFunctionWithoutLimit(contractID, joinInfo.joinFuncName,
JsonElement.class, jo);
finalResult.result = ret; finalResult.result = ret;
} }
return; return;

View File

@ -43,22 +43,18 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
public SelfAdaptiveShardingExecutor(String contractID) { public SelfAdaptiveShardingExecutor(String contractID) {
this.meta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID); this.meta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID);
this.future = Executors.newScheduledThreadPool(1).scheduleWithFixedDelay( this.future = Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(this::submitBlock,
this::submitBlock, DELAY, DELAY, TimeUnit.SECONDS);
DELAY,
DELAY,
TimeUnit.SECONDS);
Executors.newCachedThreadPool().execute(() -> { Executors.newCachedThreadPool().execute(() -> {
LOGGER.info(String.format( LOGGER.info(String.format("[Executor %s] starting executing service... %b",
"[Executor %s] starting executing service... %b",
meta.getContractID(), meta.isMaster())); meta.getContractID(), meta.isMaster()));
while (running) { while (running) {
LOGGER.info(String.format( LOGGER.info(String.format(
"[Executor %s] checking blocks to be executed, latest height=%s, to be executed size=%d", "[Executor %s] checking blocks to be executed, latest height=%s, to be executed size=%d",
meta.getContractID(), executorPointer, toExecuted.size())); meta.getContractID(), executorPointer, toExecuted.size()));
for (Block block = toExecuted.get(executorPointer); for (Block block =
null != block; toExecuted.get(executorPointer); null != block; executorPointer +=
executorPointer += 1, block = toExecuted.get(executorPointer)) { 1, block = toExecuted.get(executorPointer)) {
executeBlock(block); executeBlock(block);
toExecuted.remove(executorPointer); toExecuted.remove(executorPointer);
} }
@ -67,8 +63,7 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
executorFlag.wait(); executorFlag.wait();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOGGER.warn(String.format( LOGGER.warn(String.format("[Executor %s] waiting is interrupted: %s",
"[Executor %s] waiting is interrupted: %s",
meta.getContractID(), e.getMessage())); meta.getContractID(), e.getMessage()));
} }
} }
@ -85,7 +80,8 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
} }
@Override @Override
public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) { public void execute(String requestID, ContractRequest req, ResultCallback rcb,
OnHashCallback hcb) {
// check client // check client
ContractClient client = cmActions.getManager().getClient(meta.getContractID()); ContractClient client = cmActions.getManager().getClient(meta.getContractID());
if (null == client) { if (null == client) {
@ -97,10 +93,10 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
// check function // check function
FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction()); FunctionDesp funDesp = client.contractMeta.getExportedFunction(req.getAction());
if (null == funDesp) { if (null == funDesp) {
LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID() + " not found!"); LOGGER.warn("action " + req.getAction() + " of contract " + meta.getContractID()
+ " not found!");
rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error, rcb.onResult(JsonUtil.toJson(new ContractResult(ContractResult.Status.Error,
new JsonPrimitive(String.format( new JsonPrimitive(String.format("action %s of contract %s not found!",
"action %s of contract %s not found!",
req.getAction(), meta.getContractID()))))); req.getAction(), meta.getContractID())))));
return; return;
} }
@ -145,19 +141,12 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
Block block = JsonUtil.fromJson(blockStr, Block.class); Block block = JsonUtil.fromJson(blockStr, Block.class);
// the block must have not been cached or executed, and must be valid // the block must have not been cached or executed, and must be valid
boolean valid = block.isValid(); boolean valid = block.isValid();
if (!toExecuted.containsKey(block.height) && if (!toExecuted.containsKey(block.height) && block.height >= executorPointer && valid) {
block.height >= executorPointer &&
valid) {
// add block into block cache // add block into block cache
LOGGER.info(String.format( LOGGER.info(String.format(
"[Executor %s] receive block [%d] %s -> %s, %d transactions, timestamp=%d, size=%d", "[Executor %s] receive block [%d] %s -> %s, %d transactions, timestamp=%d, size=%d",
meta.getContractID(), meta.getContractID(), block.height, block.hash, block.prevHash,
block.height, block.requests.length, block.timestamp, blockStr.length()));
block.hash,
block.prevHash,
block.requests.length,
block.timestamp,
blockStr.length()));
toExecuted.put(block.height, block); toExecuted.put(block.height, block);
// notify thread to execute blocks // notify thread to execute blocks
synchronized (executorFlag) { synchronized (executorFlag) {
@ -165,10 +154,7 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
} }
} else { } else {
LOGGER.warn(String.format("receive block [%d] %s failed! checking: cached %b valid %b", LOGGER.warn(String.format("receive block [%d] %s failed! checking: cached %b valid %b",
block.height, block.height, block.hash, toExecuted.containsKey(block.height), valid));
block.hash,
toExecuted.containsKey(block.height),
valid));
} }
} }
@ -177,25 +163,25 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
LOGGER.debug("start"); LOGGER.debug("start");
// check contract requests, requests must have not been executed // check contract requests, requests must have not been executed
for (ContractRequest request : block.requests) { for (ContractRequest request : block.requests) {
if (executedTxs.containsKey(request.getRequestID()) && executedTxs.get(request.getRequestID())) { if (executedTxs.containsKey(request.getRequestID())
&& executedTxs.get(request.getRequestID())) {
LOGGER.debug("find request " + request.getRequestID() + " has been executed!"); LOGGER.debug("find request " + request.getRequestID() + " has been executed!");
return; return;
} }
} }
// TODO check status // TODO check status
if (null != block.checkPoint && !block.checkPoint.isEmpty()) { if (null != block.checkPoint && !block.checkPoint.isEmpty()) {
cmActions.getManager().recoverUnitFromCheckPoint(meta.getContractID(), block.checkPoint); cmActions.getManager().recoverUnitFromCheckPoint(meta.getContractID(),
block.checkPoint);
} }
// executed requests // executed requests
for (ContractRequest request : block.requests) { for (ContractRequest request : block.requests) {
String ret = cmActions.getManager().executeLocally(request, null); String ret = cmActions.getManager().executeLocally(request, null);
LOGGER.debug(String.format( LOGGER.debug(String.format("[Executor %s] result of request %s: %s",
"[Executor %s] result of request %s: %s",
meta.getContractID(), request.getRequestID(), ret)); meta.getContractID(), request.getRequestID(), ret));
executedTxs.put(request.getRequestID(), true); executedTxs.put(request.getRequestID(), true);
} }
LOGGER.info(String.format( LOGGER.info(String.format("[Executor %s] execute %d transactions of block [%d] %s",
"[Executor %s] execute %d transactions of block [%d] %s",
meta.getContractID(), block.requests.length, block.height, block.hash)); meta.getContractID(), block.requests.length, block.height, block.hash));
} }
@ -212,16 +198,18 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
req.addProperty("contractID", this.meta.getContractID()); req.addProperty("contractID", this.meta.getContractID());
String reqStr = req.toString(); String reqStr = req.toString();
// deliver blocks // deliver blocks
LOGGER.info("[Executor " + meta.getContractID() + "] deliver block " + block.hash + "..."); LOGGER.info(
"[Executor " + meta.getContractID() + "] deliver block " + block.hash + "...");
String myself = cmActions.getManager().nodeCenterConn.getNodeId(); String myself = cmActions.getManager().nodeCenterConn.getNodeId();
this.onDeliverBlock(blockStr); this.onDeliverBlock(blockStr);
for (String node : nodes) { for (String node : nodes) {
// TODO: find dead lock here // TODO: find dead lock here
// if (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID()) // if
// == RecoverFlag.Fine) { // (MasterServerRecoverMechAction.recoverStatus.get(node).get(this.meta.getContractID())
// LOGGER.info("deliver block " + block.hash + " to node " + node); // == RecoverFlag.Fine) {
// NetworkManager.instance.sendToAgent(node, reqStr); // LOGGER.info("deliver block " + block.hash + " to node " + node);
// } // NetworkManager.instance.sendToAgent(node, reqStr);
// }
if (!Objects.equals(myself, node)) { if (!Objects.equals(myself, node)) {
networkManager.sendToAgent(node, reqStr); networkManager.sendToAgent(node, reqStr);
} }
@ -278,18 +266,19 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
public boolean isValid() { public boolean isValid() {
boolean hashValid = computeHash().equals(hash), boolean hashValid = computeHash().equals(hash),
// bodyValid = body.equals(merkle(this.requests)), // bodyValid = body.equals(merkle(this.requests)),
bodyValid = true, bodyValid = true, signValid = verifySignature();
signValid = verifySignature();
boolean ret = hashValid & bodyValid & signValid; boolean ret = hashValid & bodyValid & signValid;
if (!ret) { if (!ret) {
LOGGER.warn(String.format("hash %b body %b sign %b", hashValid, bodyValid, signValid)); LOGGER.warn(
String.format("hash %b body %b sign %b", hashValid, bodyValid, signValid));
} }
return ret; return ret;
} }
private String computeHash() { private String computeHash() {
return HashUtil.sha3(String.valueOf(this.height), this.prevHash, this.checkPoint, this.body); return HashUtil.sha3(String.valueOf(this.height), this.prevHash, this.checkPoint,
this.body);
} }
private String merkle(ContractRequest[] requests) { private String merkle(ContractRequest[] requests) {
@ -300,9 +289,8 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
if (requests.length == 1) { if (requests.length == 1) {
return HashUtil.sha3(requests[0].getRequestID()); return HashUtil.sha3(requests[0].getRequestID());
} }
Queue<String> merkleQueue = Queue<String> merkleQueue = Arrays.stream(requests).map(ContractRequest::getRequestID)
Arrays.stream(requests).map(ContractRequest::getRequestID) .collect(Collectors.toCollection(ArrayDeque::new));
.collect(Collectors.toCollection(ArrayDeque::new));
do { do {
int size; int size;
for (size = merkleQueue.size(); size > 1; size -= 2) { for (size = merkleQueue.size(); size > 1; size -= 2) {
@ -330,4 +318,4 @@ public class SelfAdaptiveShardingExecutor extends AbstractContextContractExecuto
return this.hash; return this.hash;
} }
} }
} }

View File

@ -72,8 +72,10 @@ public class MessageTest {
assert msg.getPrevLogTerm() == message.getPrevLogTerm(); assert msg.getPrevLogTerm() == message.getPrevLogTerm();
assert msg.getLeaderCommit() == message.getLeaderCommit(); assert msg.getLeaderCommit() == message.getLeaderCommit();
assert msg.getEntries().size() == message.getEntries().size(); assert msg.getEntries().size() == message.getEntries().size();
assert msg.getEntries().get(0).getCommand().getAction().equals(message.getEntries().get(0).getCommand().getAction()); assert msg.getEntries().get(0).getCommand().getAction()
assert msg.getEntries().get(9).getCommand().getAction().equals(message.getEntries().get(9).getCommand().getAction()); .equals(message.getEntries().get(0).getCommand().getAction());
assert msg.getEntries().get(9).getCommand().getAction()
.equals(message.getEntries().get(9).getCommand().getAction());
} }
@Test @Test

View File

@ -10,7 +10,8 @@ import java.util.List;
public class LogManagerTest { public class LogManagerTest {
@Test @Test
public void testLogManager() { public void testLogManager() {
LogManager logManager = new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000); LogManager logManager =
new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000);
List<LogEntry> entries = new ArrayList<>(); List<LogEntry> entries = new ArrayList<>();
LogEntry entry; LogEntry entry;
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
@ -28,7 +29,8 @@ public class LogManagerTest {
@Test @Test
public void testLogManager2() { public void testLogManager2() {
LogManager logManager = new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000); LogManager logManager =
new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000);
LogEntry entry = logManager.getEntry(386); LogEntry entry = logManager.getEntry(386);
assert entry.getTerm() == 38; assert entry.getTerm() == 38;
assert entry.getIndex() == 386; assert entry.getIndex() == 386;
@ -54,9 +56,11 @@ public class LogManagerTest {
long index = logManager.append(entries); long index = logManager.append(entries);
assert index == 1099; assert index == 1099;
} }
@Test @Test
public void testLogManager3() { public void testLogManager3() {
LogManager logManager = new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000); LogManager logManager =
new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000);
List<LogEntry> entryList = logManager.getEntriesStartAt(677); List<LogEntry> entryList = logManager.getEntriesStartAt(677);
assert entryList.size() == 1100 - 677; assert entryList.size() == 1100 - 677;
assert entryList.get(100).getIndex() == 777; assert entryList.get(100).getIndex() == 777;