This commit is contained in:
CaiHQ 2022-05-27 17:47:04 +08:00
parent e10866ff8b
commit b468f5947f
22 changed files with 1933 additions and 2 deletions

View File

@ -171,7 +171,6 @@ public class PBFTExecutor extends AbstractContextContractExecutor {
return; return;
} }
req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID())); req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID()));
// 三个相同requestID进来的时候会有冲突 // 三个相同requestID进来的时候会有冲突
// 仅在此处有冲突么 // 仅在此处有冲突么
// 这里是从MasterServer->MasterClient请求的是"executeContractLocally" // 这里是从MasterServer->MasterClient请求的是"executeContractLocally"

View File

@ -218,7 +218,7 @@ public class RequestAllExecutor extends AbstractContextContractExecutor {
ResultCallback originalCallback; ResultCallback originalCallback;
Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点 Set<String> nodeIDs = new HashSet<>(); // 已收到返回结果的节点
ResultMerger( public ResultMerger(
final ResultCallback originalCb, final ResultCallback originalCb,
final int count, final int count,
final int request_seq, final int request_seq,

View File

@ -0,0 +1,111 @@
package org.bdware.consistency.plugin.raft;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.common.AbstractContextContractExecutor;
import org.bdware.consistency.plugin.pbft.ContractCluster;
import org.bdware.consistency.plugin.ra.RequestAllExecutor;
import org.bdware.consistency.plugin.raft.algo.RaftAlgorithm;
import org.bdware.consistency.plugin.raft.algo.RaftConfig;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.Node;
import org.bdware.sc.conn.OnHashCallback;
import org.bdware.sc.conn.ResultCallback;
import org.bdware.sc.consistency.Committer;
import org.bdware.sc.units.MultiContractMeta;
import org.bdware.sc.units.PubKeyNode;
import org.bdware.sc.util.JsonUtil;
import org.zz.gmhelper.SM2KeyPair;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class RAFTExecutor extends AbstractContextContractExecutor {
private static final Logger LOGGER = LogManager.getLogger(RAFTExecutor.class);
private final ContractCluster contractCluster;
RaftAlgorithm raft;
public RAFTExecutor(String contractID, String[] memberPubKeys) {
RaftConfig config = new RaftConfig();
PubKeyNode selfNode = new PubKeyNode();
selfNode.pubkey = globalConf.getNodeID();
List<PubKeyNode> members = new ArrayList<>();
for (String memberPubKey : memberPubKeys) {
PubKeyNode pubkeyNode = new PubKeyNode();
pubkeyNode.pubkey = memberPubKey;
members.add(pubkeyNode);
}
contractCluster = new ContractCluster(contractID, members);
final MultiContractMeta cei = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID);
raft = new RaftAlgorithm(config, selfNode, contractCluster, new Committer() {
@Override
public void onCommit(ContractRequest request) {
ResultCallback ret = null;
final long startTime = System.currentTimeMillis();
ret = new ResultCallback() {
@Override
public void onResult(String str) {
Map<String, String> ret = new HashMap<>();
ret.put("action", "receiveTrustfullyResult");
SM2KeyPair keyPair = globalConf.getKeyPair();
ret.put("nodeID", keyPair.getPublicKeyStr());
ret.put("responseID", request.getRequestID());
ret.put("executeTime", (System.currentTimeMillis() - startTime) + "");
ret.put("data", str);
cei.setLastExeSeq(request.seq);
networkManager.sendToAgent(raft.getLeader().pubkey, JsonUtil.toJson(ret));
}
};
cmActions.getManager().executeLocallyAsync(request, ret, null);
}
});
//TODO 在改变角色时需要通知node cluster
if (cei.isMaster()) {
raft.convertToLeader();
} else {
raft.convertToFollower();
}
}
@Override
public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) {
LOGGER.debug(JsonUtil.toJson(req));
MultiContractMeta multiContractMeta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(req.getContractID());
if (multiContractMeta == null || !multiContractMeta.isMaster()) {
cmActions.getManager().executeContractOnOtherNodes(req, rcb);
return;
}
ResultCallback collector;
int count = contractCluster.getNodes().size();
collector = networkManager.createResultCallback(
requestID, new RequestAllExecutor.ResultMerger(rcb, count, req.seq, req.getContractID()), count);
masterServerTCPAction.getSync().sleep(requestID, collector);
raft.insertLogEntry(req);
}
@Override
public void onRecover(Map<String, Object> args) {
super.onRecover(args);
}
@Override
public void onDeliverBlock(String data) {
super.onDeliverBlock(data);
}
@Override
public void close() {
super.close();
}
@Override
public void onSyncMessage(Node node, byte[] data) {
raft.onMessage(node, data);
}
}

View File

@ -0,0 +1,22 @@
package org.bdware.consistency.plugin.raft;
import org.bdware.consistency.plugin.pbft.PBFTExecutor;
import org.bdware.sdk.consistency.api.ContractExecutorFactory;
import org.bdware.server.trustedmodel.ContractExecutor;
import java.util.Map;
public class RAFTExecutorFactory implements ContractExecutorFactory {
@Override
public String getExecutorName() {
return "RAFT";
}
@Override
public ContractExecutor getInstance(Map<String, Object> args) {
// int nodeSize = (int) args.get("nodeSize");
String contractID = (String) args.get("contractID");
String[] memberPubKeys = (String[]) args.get("members");
return new RAFTExecutor(contractID, memberPubKeys);
}
}

View File

@ -0,0 +1,413 @@
package org.bdware.consistency.plugin.raft.algo;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.pbft.ContractCluster;
import org.bdware.consistency.plugin.raft.algo.message.*;
import org.bdware.consistency.plugin.raft.algo.storage.LogEntry;
import org.bdware.consistency.plugin.raft.algo.storage.LogManager;
import org.bdware.consistency.plugin.raft.algo.storage.StateManager;
import org.bdware.consistency.plugin.raft.algo.util.HeartBeatUtil;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.Node;
import org.bdware.sc.consistency.CommitAlgorithm;
import org.bdware.sc.consistency.Committer;
import org.bdware.sc.units.PubKeyNode;
import org.bdware.sc.units.TrustfulExecutorConnection;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class RaftAlgorithm implements CommitAlgorithm {
private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(RaftAlgorithm.class);
private ContractCluster connection;
Committer committer;
private enum State {LEADER, FOLLOWER, CANDIDATE}
State state;
PubKeyNode leader;
final PubKeyNode self;
List<PubKeyNode> servers;
StateManager stateManager;
// Persistent state on all servers
long currentTerm;
PubKeyNode votedFor;
LogManager logManager;
// Volatile state on all servers
long commitIndex;
long lastApplied;
// Volatile state on leaders
Map<String, Long> nextIndex;
Map<String, Long> matchIndex;
int grantedVoteCount;
// electionTimeout
TimerTask electionTimerTask;
TimerTask heartbeatTimerTask;
long lastActivated;
//config
RaftConfig config;
// Locks and Conditions
private Lock lock = new ReentrantLock();
public RaftAlgorithm(RaftConfig config, PubKeyNode node, ContractCluster conn, Committer committer) {
this.config = config;
self = node;
connection = conn;
this.committer = committer;
state = State.FOLLOWER;
leader = null;
servers = connection.getNodes();
currentTerm = StateManager.loadCurrentTerm();
votedFor = StateManager.loadVotedFor();
logManager = new LogManager(config.raftLogDir, config.maxSegmentFileSize);
commitIndex = 0;
lastApplied = 0;
electionTimerTask = new TimerTask() {
@Override
public void run() {
long cur = System.currentTimeMillis();
if (cur - lastActivated > config.getElectionTimeout()) {
startElection();
}
}
};
heartbeatTimerTask = new TimerTask() {
@Override
public void run() {
lastActivated = System.currentTimeMillis();
broadcastAppendEntries();
}
};
resetElectionTimer();
}
public boolean isLeader() {
return state == State.LEADER;
}
@Override
public void onMessage(Node sender, byte[] msg) {
if (((PubKeyNode) sender).pubkey.equals(self.pubkey)) {
LOGGER.info("IGNORE SELF:" + msg.length);
return;
}
LOGGER.info("RECEIVE:" + msg.length);
if (!isServerValid((PubKeyNode) sender)) {
LOGGER.debug("message from an invalid server ({})!", sender.toString());
return;
}
IRaftMessage message = RaftMessageParser.parse(msg);
LOGGER.info("RECEIVE:" + message.getType());
lock.lock();
try {
IRaftMessage response;
switch (message.getType()) {
case RequestVote:
response = onRequestVote((RequestVote) message);
persist();
connection.sendMessage((PubKeyNode) sender, new RaftMessageParser(response).getBytes());
break;
case RequestVoteResponse:
onRequestVoteResponse((RequestVoteResponse) message);
break;
case AppendEntries:
response = onAppendEntries((AppendEntries) message);
persist();
connection.sendMessage((PubKeyNode) sender, new RaftMessageParser(response).getBytes());
break;
case AppendEntriesResponse:
onAppendEntriesResponse((PubKeyNode) sender, (AppendEntriesResponse) message);
break;
case Unknown:
break;
default:
throw new IllegalStateException("Unexpected value: " + message.getType());
}
} finally {
lock.unlock();
}
}
// in lock
private RequestVoteResponse onRequestVote(RequestVote request) {
if (request.getTerm() < currentTerm) {
LOGGER.debug("Rejected RequestVote message in term {}" +
", because the message is stale. My term is {}.", request.getTerm(), currentTerm);
return new RequestVoteResponse(currentTerm, false);
}
if (request.getTerm() == currentTerm) {
if (votedFor != null && !votedFor.pubkey.equals(request.getCandidate().pubkey)) {
LOGGER.debug("Rejected RequestVote message in term {}" +
", because I have voted for another server. ", request.getTerm());
return new RequestVoteResponse(currentTerm, false);
}
} else {
currentTerm = request.getTerm();
leader = null;
votedFor = null;
convertToFollower();
}
if (!isUpToDateThanMyLog(request.getLastLogTerm(), request.getLastLogIndex())) {
LOGGER.debug("Rejected RequestVote message, because my log is more up-to-date. ");
return new RequestVoteResponse(currentTerm, false);
}
// (request.getTerm() == currentTerm && votedFor == null) || request.getTerm() > currentTerm
// || request server's log is more up-to-date than mine
votedFor = request.getCandidate();
LOGGER.info("Granted RequestVote message from server (pubKey={}) in term {}",
request.getCandidate().pubkey, currentTerm);
return new RequestVoteResponse(currentTerm, true);
}
// in lock
private void onRequestVoteResponse(RequestVoteResponse response) {
if (response.getTerm() > currentTerm) {
currentTerm = response.getTerm();
leader = null;
votedFor = null;
convertToFollower();
} else if (state == State.CANDIDATE && response.getTerm() == currentTerm && response.isGranted()) {
++grantedVoteCount;
if (grantedVoteCount > servers.size() / 2) {
convertToLeader();
LOGGER.info("Get votes from a majority. Convert to leader.");
}
}
}
// in lock
private AppendEntriesResponse onAppendEntries(AppendEntries request) {
LogEntry lastEntry = logManager.getLastEntry();
if (request.getTerm() < currentTerm) {
LOGGER.debug("Rejected AppendEntries message in term {}" +
", because the message is stale. My term is {}.", request.getTerm(), currentTerm);
return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), false, self);
}
if (request.getTerm() > currentTerm) {
currentTerm = request.getTerm();
leader = null;
votedFor = null;
convertToFollower();
}
if (leader == null) {
leader = request.getLeader();
LOGGER.info("new leader={}", leader.pubkey);
}
if (!leader.pubkey.equals(request.getLeader().pubkey)) {
LOGGER.warn("Another peer={} declares that it is the leader " +
"at term={} which was occupied by leader={}",
request.getLeader().pubkey, request.getTerm(), leader.pubkey);
return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), false, self);
}
resetElectionTimer();
lastActivated = System.currentTimeMillis();
LogEntry entry = logManager.getEntry(request.getPrevLogIndex());
if (entry == null) {
return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), false, self);
}
if (entry.getTerm() != request.getTerm() || logManager.getLastEntryIndex() > request.getPrevLogIndex()) {
logManager.deleteEntriesStartAt(request.getPrevLogIndex());
lastEntry = logManager.getLastEntry();
}
List<LogEntry> newEntries = request.getEntries();
if (newEntries.size() == 0) {
return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), true, self);
}
logManager.append(newEntries);
lastEntry = logManager.getLastEntry();
if (request.getLeaderCommit() > commitIndex) {
commitIndex = Math.min(request.getLeaderCommit(), newEntries.get(newEntries.size() - 1).getIndex());
}
LOGGER.info("RECEIVE APPENDENTRY: commitIndex=" + commitIndex + " lastApplied:" + lastApplied);
while (commitIndex > lastApplied) {
applyToStateMachine(lastApplied + 1);
lastApplied++;
}
return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), true, self);
}
// in lock
private void onAppendEntriesResponse(PubKeyNode sender, AppendEntriesResponse response) {
if (response.getTerm() > currentTerm) {
currentTerm = response.getTerm();
leader = null;
votedFor = null;
convertToFollower();
} else if (state == State.LEADER && response.getTerm() == currentTerm) {
if (response.isSuccess()) {
if (matchIndex.containsKey(sender.pubkey) && matchIndex.get(sender.pubkey) > response.getLastLogIndex()) {
LOGGER.warn("node={} matchIndex decrease! prev matchIndex={}, lastLogIndex={}",
sender.pubkey, matchIndex.get(sender.pubkey), response.getLastLogIndex());
throw new RuntimeException("matchIndex error");
}
matchIndex.put(sender.pubkey, response.getLastLogIndex());
}
nextIndex.put(sender.pubkey, response.getLastLogIndex() + 1);
}
}
// todo
private void applyToStateMachine(long logIndex) {
LogEntry entry = logManager.getEntry(logIndex);
LOGGER.info("Commit: requestID = " + entry.getCommand().getRequestID());
committer.onCommit(entry.getCommand());
}
public void insertLogEntry(ContractRequest req) {
LogEntry entry = new LogEntry();
entry.setCommand(req);
entry.setIndex(logManager.getLastEntryIndex() + 1);
entry.setTerm(currentTerm);
logManager.append(entry);
broadcastAppendEntries();
}
private void broadcastAppendEntries() {
lock.lock();
try {
LogEntry lastEntry = logManager.getLastEntry();
long lastLogIndex = lastEntry.getIndex();
nextIndex.forEach((pubKey, index) -> {
PubKeyNode node = new PubKeyNode();
node.pubkey = pubKey;
if (lastLogIndex >= index) {
sendAppendEntries(node, index);
} else {
AppendEntries req = new AppendEntries(currentTerm, self, lastEntry.getIndex(), lastEntry.getTerm(), commitIndex, null);
connection.sendMessage(node, new RaftMessageParser(req).getBytes());
}
});
List<Long> indexList = new ArrayList<>(matchIndex.values());
Collections.sort(indexList);
long majorityIndex = indexList.get((indexList.size() - 1) / 2);
LogEntry entry = logManager.getEntry(majorityIndex);
if (majorityIndex > commitIndex && entry.getTerm() == currentTerm) {
commitIndex = majorityIndex;
}
while (commitIndex > lastApplied) {
applyToStateMachine(lastApplied + 1);
lastApplied++;
}
} finally {
lock.unlock();
}
}
private void sendAppendEntries(PubKeyNode follower, long logIndex) {
LogEntry prevLog = logManager.getEntry(logIndex - 1);
List<LogEntry> entries = logManager.getEntriesStartAt(logIndex);
AppendEntries request = new AppendEntries(currentTerm, leader, prevLog.getIndex(), prevLog.getTerm(), commitIndex, entries);
connection.sendMessage(follower, new RaftMessageParser(request).getBytes());
}
private void persist() {
StateManager.saveCurrentTerm(currentTerm);
StateManager.saveVotedFor(votedFor);
}
public void convertToFollower() {
if (state == State.FOLLOWER) {
return;
}
state = State.FOLLOWER;
stopHeartbeat();
}
// in lock
public void convertToLeader() {
state = State.LEADER;
leader = self;
nextIndex = new HashMap<>();
matchIndex = new HashMap<>();
startHeartbeat();
LogEntry lastEntry = logManager.getLastEntry();
for (PubKeyNode server : servers) {
if (server != null) {
if (lastEntry == null)
nextIndex.put(server.pubkey, 1L);
else
nextIndex.put(server.pubkey, lastEntry.getIndex() + 1);
matchIndex.put(server.pubkey, 0L);
// AppendEntries req = new AppendEntries(currentTerm, self, lastEntry.getIndex(), lastEntry.getTerm(), commitIndex, null);
// connection.sendMessage(server,new RaftMessageParser(req).getBytes());
}
}
}
private void stopHeartbeat() {
HeartBeatUtil.getInstance().cancel(heartbeatTimerTask);
}
private void startHeartbeat() {
HeartBeatUtil.getInstance().schedule(heartbeatTimerTask, 0, config.heartbeatTimeout);
}
private void startElection() {
RequestVote request;
lock.lock();
try {
++currentTerm;
state = State.CANDIDATE;
votedFor = self;
grantedVoteCount = 1;
LogEntry lastLog = logManager.getLastEntry();
request = new RequestVote(currentTerm, self, lastLog.getIndex(), lastLog.getTerm());
} finally {
lock.unlock();
}
connection.broadcast(request.getBytes());
}
private void resetElectionTimer() {
HeartBeatUtil.getInstance().cancel(electionTimerTask);
HeartBeatUtil.getInstance().schedule(electionTimerTask, config.getElectionTimeout(), 1000);
}
private boolean isServerValid(PubKeyNode server) {
if (server == null) return false;
List<PubKeyNode> nodes = connection.getNodes();
for (PubKeyNode node : nodes) {
if (node.pubkey.equals(server.pubkey)) {
return true;
}
}
return false;
}
private boolean isUpToDateThanMyLog(long lastLogTerm, long lastLogIndex) {
LogEntry myLastLog = logManager.getLastEntry();
long myLastLogIndex = myLastLog.getIndex();
long myLastLogTerm = myLastLog.getTerm();
return lastLogTerm > myLastLogTerm
|| (lastLogTerm == myLastLogTerm && lastLogIndex >= myLastLogIndex);
}
@Override
public void setCommitter(Committer c) {
committer = c;
}
@Override
public void setConnection(TrustfulExecutorConnection c) {
connection = (ContractCluster) c;
}
public PubKeyNode getLeader() {
return leader;
}
}

View File

@ -0,0 +1,12 @@
package org.bdware.consistency.plugin.raft.algo;
public class RaftConfig {
public String raftLogDir = "./ContractDB/RAFTAll/";
public int maxSegmentFileSize = 100000000;
private int electionTimeout = 90000; // unit: millisecond
public int heartbeatTimeout = 30000; // unit: millisecond
public int getElectionTimeout() {
return electionTimeout;
}
}

View File

@ -0,0 +1,146 @@
package org.bdware.consistency.plugin.raft.algo.message;
import org.bdware.consistency.plugin.raft.algo.storage.LogEntry;
import org.bdware.sc.conn.ByteUtil;
import org.bdware.sc.units.PubKeyNode;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class AppendEntries implements IRaftMessage {
RaftMessageType type;
long term; // leader's term
PubKeyNode leader; // leader's id
long prevLogIndex;
long prevLogTerm;
long leaderCommit; // leader's commitIndex
List<LogEntry> entries;
public AppendEntries() {
this.type = RaftMessageType.AppendEntries;
}
public AppendEntries(long term, PubKeyNode leader, long prevLogIndex, long prevLogTerm, long leaderCommit, List<LogEntry> entries) {
this.type = RaftMessageType.AppendEntries;
this.term = term;
this.leader = leader;
this.prevLogIndex = prevLogIndex;
this.prevLogTerm = prevLogTerm;
this.leaderCommit = leaderCommit;
this.entries = entries;
}
@Override
public byte[] getBytes() {
ByteArrayOutputStream msg = new ByteArrayOutputStream();
try {
ByteUtil.writeLong(msg, term);
if (leader != null && !leader.pubkey.equals("")) {
byte[] pubkey = leader.pubkey.getBytes();
ByteUtil.writeInt(msg, pubkey.length);
msg.write(pubkey);
} else {
ByteUtil.writeInt(msg, 0);
}
ByteUtil.writeLong(msg, prevLogIndex);
ByteUtil.writeLong(msg, prevLogTerm);
ByteUtil.writeLong(msg, leaderCommit);
if (entries != null) {
ByteUtil.writeInt(msg, entries.size());
for (LogEntry entry : entries) {
byte[] entryBytes = entry.getBytes();
ByteUtil.writeInt(msg, entryBytes.length);
msg.write(entryBytes);
}
} else {
ByteUtil.writeInt(msg, 0);
}
} catch (IOException e) {
e.printStackTrace();
}
return msg.toByteArray();
}
@Override
public IRaftMessage parse(byte[] b) {
AppendEntries msg = new AppendEntries();
ByteArrayInputStream bi = new ByteArrayInputStream(b);
msg.term = ByteUtil.readLong(bi);
int leaderLen = ByteUtil.readInt(bi);
PubKeyNode node = new PubKeyNode();
if (leaderLen > 0) {
node.pubkey = new String(ByteUtil.readBytes(bi, leaderLen));
}
msg.leader = node;
msg.prevLogIndex = ByteUtil.readLong(bi);
msg.prevLogTerm = ByteUtil.readLong(bi);
msg.leaderCommit = ByteUtil.readLong(bi);
int entriesLen = ByteUtil.readInt(bi);
List<LogEntry> list = new ArrayList<>();
if (entriesLen > 0) {
for (int i = 0; i < entriesLen; i++) {
int len = ByteUtil.readInt(bi);
byte[] bytes = ByteUtil.readBytes(bi, len);
LogEntry entry = LogEntry.parse(bytes);
list.add(entry);
}
}
msg.entries = list;
return msg;
}
public RaftMessageType getType() {
return type;
}
public long getTerm() {
return term;
}
public void setTerm(long term) {
this.term = term;
}
public PubKeyNode getLeader() {
return leader;
}
public void setLeader(PubKeyNode leader) {
this.leader = leader;
}
public long getPrevLogIndex() {
return prevLogIndex;
}
public void setPrevLogIndex(long prevLogIndex) {
this.prevLogIndex = prevLogIndex;
}
public long getPrevLogTerm() {
return prevLogTerm;
}
public void setPrevLogTerm(long prevLogTerm) {
this.prevLogTerm = prevLogTerm;
}
public long getLeaderCommit() {
return leaderCommit;
}
public void setLeaderCommit(long leaderCommit) {
this.leaderCommit = leaderCommit;
}
public List<LogEntry> getEntries() {
return entries;
}
public void setEntries(List<LogEntry> entries) {
this.entries = entries;
}
}

View File

@ -0,0 +1,103 @@
package org.bdware.consistency.plugin.raft.algo.message;
import org.bdware.sc.conn.ByteUtil;
import org.bdware.sc.units.PubKeyNode;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
public class AppendEntriesResponse implements IRaftMessage {
RaftMessageType type;
long term;
long lastLogIndex;
boolean success;
PubKeyNode server;
public AppendEntriesResponse() {
type = RaftMessageType.AppendEntriesResponse;
}
public AppendEntriesResponse(long term, long lastLogIndex, boolean success, PubKeyNode server) {
type = RaftMessageType.AppendEntriesResponse;
this.term = term;
this.lastLogIndex = lastLogIndex;
this.success = success;
this.server = server;
}
@Override
public byte[] getBytes() {
ByteArrayOutputStream msg = new ByteArrayOutputStream();
ByteUtil.writeLong(msg, term);
ByteUtil.writeLong(msg, lastLogIndex);
ByteUtil.writeBoolean(msg, success);
try {
if (server != null && !server.pubkey.equals("")) {
byte[] pubkey = server.pubkey.getBytes();
ByteUtil.writeInt(msg, pubkey.length);
msg.write(pubkey);
} else {
ByteUtil.writeInt(msg, 0);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return msg.toByteArray();
}
@Override
public IRaftMessage parse(byte[] b) {
AppendEntriesResponse msg = new AppendEntriesResponse();
ByteArrayInputStream bi = new ByteArrayInputStream(b);
msg.term = ByteUtil.readLong(bi);
msg.lastLogIndex = ByteUtil.readLong(bi);
msg.success = ByteUtil.readBoolean(bi);
int serverLen = ByteUtil.readInt(bi);
PubKeyNode node = new PubKeyNode();
if (serverLen > 0) {
node.pubkey = new String(ByteUtil.readBytes(bi, serverLen));
}
msg.server = node;
return msg;
}
@Override
public RaftMessageType getType() {
return type;
}
public long getTerm() {
return term;
}
public void setTerm(long term) {
this.term = term;
}
public boolean isSuccess() {
return success;
}
public void setSuccess(boolean success) {
this.success = success;
}
public PubKeyNode getServer() {
return server;
}
public void setServer(PubKeyNode server) {
this.server = server;
}
public long getLastLogIndex() {
return lastLogIndex;
}
public void setLastLogIndex(long lastLogIndex) {
this.lastLogIndex = lastLogIndex;
}
}

View File

@ -0,0 +1,28 @@
package org.bdware.consistency.plugin.raft.algo.message;
public class EmptyMessage implements IRaftMessage{
RaftMessageType type;
public EmptyMessage() {
type=RaftMessageType.Unknown;
}
@Override
public byte[] getBytes() {
return new byte[0];
}
@Override
public IRaftMessage parse(byte[] b) {
return new EmptyMessage();
}
@Override
public RaftMessageType getType() {
return type;
}
public void setType(RaftMessageType type) {
this.type = type;
}
}

View File

@ -0,0 +1,10 @@
package org.bdware.consistency.plugin.raft.algo.message;
public interface IRaftMessage {
RaftMessageType getType();
byte[] getBytes();
IRaftMessage parse(byte[] b);
}

View File

@ -0,0 +1,61 @@
package org.bdware.consistency.plugin.raft.algo.message;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bdware.sc.conn.ByteUtil;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
public class RaftMessageParser {
IRaftMessage message;
static Logger LOGGER = LogManager.getLogger(RaftMessageParser.class);
public RaftMessageParser(IRaftMessage message) {
this.message = message;
}
public byte[] getBytes() {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
try {
ByteUtil.writeInt(bo, message.getType().toInt());
byte[] bytes = message.getBytes();
ByteUtil.writeInt(bo, bytes.length);
bo.write(bytes);
LOGGER.info("[SEND] " + message.getType());
} catch (IOException e) {
e.printStackTrace();
}
return bo.toByteArray();
}
public static IRaftMessage parse(byte[] b) {
ByteArrayInputStream bi = new ByteArrayInputStream(b);
RaftMessageType type = RaftMessageType.fromInt(ByteUtil.readInt(bi));
int msgLen = ByteUtil.readInt(bi);
byte[] messageBytes = ByteUtil.readBytes(bi, msgLen);
IRaftMessage msg;
switch (type) {
case RequestVote:
msg = new RequestVote();
break;
case RequestVoteResponse:
msg = new RequestVoteResponse();
break;
case AppendEntries:
msg = new AppendEntries();
break;
case AppendEntriesResponse:
msg = new AppendEntriesResponse();
break;
default:
msg = new EmptyMessage();
}
return msg.parse(messageBytes);
}
public static RaftMessageParser setMessage(IRaftMessage message) {
return new RaftMessageParser(message);
}
}

View File

@ -0,0 +1,35 @@
package org.bdware.consistency.plugin.raft.algo.message;
public enum RaftMessageType {
RequestVote(0),
RequestVoteResponse(1),
AppendEntries(2),
AppendEntriesResponse(3),
Unknown(4);
private final int type;
RaftMessageType(int i) {
type = i;
}
public static RaftMessageType fromInt(int i) {
switch (i) {
case 0:
return RequestVote;
case 1:
return RequestVoteResponse;
case 2:
return AppendEntries;
case 3:
return AppendEntriesResponse;
default:
return Unknown;
}
}
public int toInt() {
return type;
}
}

View File

@ -0,0 +1,101 @@
package org.bdware.consistency.plugin.raft.algo.message;
import org.bdware.sc.conn.ByteUtil;
import org.bdware.sc.units.PubKeyNode;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
public class RequestVote implements IRaftMessage {
RaftMessageType type;
long term;
PubKeyNode candidate; // candidate's Id
long lastLogIndex;
long lastLogTerm;
public RequestVote() {
type = RaftMessageType.RequestVote;
}
public RequestVote(long term, PubKeyNode candidate, long lastLogIndex, long lastLogTerm) {
type = RaftMessageType.RequestVote;
this.term = term;
this.candidate = candidate;
this.lastLogIndex = lastLogIndex;
this.lastLogTerm = lastLogTerm;
}
@Override
public byte[] getBytes() {
ByteArrayOutputStream msg = new ByteArrayOutputStream();
try {
ByteUtil.writeLong(msg, term);
if (candidate != null && !candidate.pubkey.equals("")) {
byte[] pubkey = candidate.pubkey.getBytes();
ByteUtil.writeInt(msg, pubkey.length);
msg.write(pubkey);
} else {
ByteUtil.writeInt(msg, 0);
}
ByteUtil.writeLong(msg, lastLogIndex);
ByteUtil.writeLong(msg, lastLogTerm);
} catch (IOException e) {
e.printStackTrace();
}
return msg.toByteArray();
}
@Override
public IRaftMessage parse(byte[] b) {
RequestVote msg = new RequestVote();
ByteArrayInputStream bi = new ByteArrayInputStream(b);
msg.term = ByteUtil.readLong(bi);
int candidateLen = ByteUtil.readInt(bi);
PubKeyNode node = new PubKeyNode();
if (candidateLen > 0) {
node.pubkey = new String(ByteUtil.readBytes(bi, candidateLen));
}
msg.candidate = node;
msg.lastLogIndex = ByteUtil.readLong(bi);
msg.lastLogTerm = ByteUtil.readLong(bi);
return msg;
}
@Override
public RaftMessageType getType() {
return type;
}
public long getTerm() {
return term;
}
public void setTerm(long term) {
this.term = term;
}
public PubKeyNode getCandidate() {
return candidate;
}
public void setCandidate(PubKeyNode candidate) {
this.candidate = candidate;
}
public long getLastLogIndex() {
return lastLogIndex;
}
public void setLastLogIndex(long lastLogIndex) {
this.lastLogIndex = lastLogIndex;
}
public long getLastLogTerm() {
return lastLogTerm;
}
public void setLastLogTerm(long lastLogTerm) {
this.lastLogTerm = lastLogTerm;
}
}

View File

@ -0,0 +1,64 @@
package org.bdware.consistency.plugin.raft.algo.message;
import org.bdware.sc.conn.ByteUtil;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
public class RequestVoteResponse implements IRaftMessage {
RaftMessageType type;
long term;
boolean granted;
public RequestVoteResponse() {
type = RaftMessageType.RequestVoteResponse;
}
public RequestVoteResponse(long term, boolean granted) {
type = RaftMessageType.RequestVoteResponse;
this.term = term;
this.granted = granted;
}
@Override
public byte[] getBytes() {
ByteArrayOutputStream msg = new ByteArrayOutputStream();
ByteUtil.writeLong(msg, term);
ByteUtil.writeBoolean(msg, granted);
return msg.toByteArray();
}
@Override
public IRaftMessage parse(byte[] b) {
RequestVoteResponse msg = new RequestVoteResponse();
ByteArrayInputStream bi = new ByteArrayInputStream(b);
msg.term = ByteUtil.readLong(bi);
msg.granted = ByteUtil.readBoolean(bi);
return msg;
}
@Override
public RaftMessageType getType() {
return type;
}
public void setType(RaftMessageType type) {
this.type = type;
}
public long getTerm() {
return term;
}
public void setTerm(long term) {
this.term = term;
}
public boolean isGranted() {
return granted;
}
public void setGranted(boolean granted) {
this.granted = granted;
}
}

View File

@ -0,0 +1,76 @@
package org.bdware.consistency.plugin.raft.algo.storage;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.conn.ByteUtil;
import org.bdware.sc.util.JsonUtil;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class LogEntry {
private long term;
private long index;
private ContractRequest command;
public long getTerm() {
return term;
}
public void setTerm(long term) {
this.term = term;
}
public long getIndex() {
return index;
}
public void setIndex(long index) {
this.index = index;
}
public ContractRequest getCommand() {
return command;
}
public void setCommand(ContractRequest command) {
this.command = command;
}
public byte[] getBytes() {
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ByteUtil.writeLong(bo, term);
ByteUtil.writeLong(bo, index);
byte[] dataByte = JsonUtil.toJson(command).getBytes(StandardCharsets.UTF_8);
try {
if (dataByte != null && dataByte.length != 0) {
ByteUtil.writeInt(bo, dataByte.length);
bo.write(dataByte);
} else {
ByteUtil.writeInt(bo, 0);
}
} catch (IOException e) {
e.printStackTrace();
}
return bo.toByteArray();
}
public static LogEntry parse(byte[] bytes) {
LogEntry entry = new LogEntry();
ByteArrayInputStream bi = new ByteArrayInputStream(bytes);
entry.term = ByteUtil.readLong(bi);
entry.index = ByteUtil.readLong(bi);
int dataLen = ByteUtil.readInt(bi);
if (dataLen > 0) {
byte[] dataByte = ByteUtil.readBytes(bi, dataLen);
entry.command = JsonUtil.fromJson(new String(dataByte, StandardCharsets.UTF_8), ContractRequest.class);
} else {
entry.command = null;
}
return entry;
}
}

View File

@ -0,0 +1,188 @@
package org.bdware.consistency.plugin.raft.algo.storage;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.raft.algo.util.FileUtil;
import org.bdware.sc.bean.ContractRequest;
import java.io.File;
import java.io.IOException;
import java.util.*;
public class LogManager {
private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(LogManager.class);
private final String logDir;
private final TreeMap<Long, String> segmentStartIndexMap = new TreeMap<>(); // key is start index, value is file name of the segment
private int maxSegmentFileSize;
private Segment lastSegment;
public LogManager(String raftDataDir, int maxSegmentFileSize) {
this.logDir = raftDataDir + File.separator + "log";
this.maxSegmentFileSize = maxSegmentFileSize;
File dir = new File(logDir);
if (!dir.exists()) {
dir.mkdirs();
}
fillMap(dir);
if (!segmentStartIndexMap.isEmpty()) {
lastSegment = Segment.loadSegment(logDir, segmentStartIndexMap.lastEntry().getValue());
LOGGER.info("SegmentStart not Null!" + this.getLastEntryIndex());
} else {
LogEntry first = new LogEntry();
first.setIndex(0);
first.setTerm(0);
first.setCommand(new ContractRequest());
append(first);
LOGGER.info("SegmentStart is Null!" + this.getLastEntryIndex());
}
}
public LogEntry getLastEntry() {
if (lastSegment == null) {
return null;
}
return lastSegment.getLastEntry();
}
public long getLastEntryIndex() {
LogEntry entry = getLastEntry();
if (entry == null) {
return -1;
}
return entry.getIndex();
}
public LogEntry getEntry(long index) {
if (lastSegment == null || index < 0 || index > lastSegment.getEndIndex()) {
return null;
}
if (index >= lastSegment.getStartIndex() && index <= lastSegment.getEndIndex()) {
return lastSegment.getEntry(index);
}
Segment segment = Segment.loadSegment(logDir, segmentStartIndexMap.floorEntry(index).getValue());
return segment.getEntry(index);
}
public long append(LogEntry entry) {
long newLastLogIndex = this.getLastEntryIndex();
newLastLogIndex++;
byte[] entryBytes = entry.getBytes();
LOGGER.info("AppendLogEntry: last=" + this.getLastEntryIndex() + " -> toAdd=" + entry.getIndex());
try {
if (segmentStartIndexMap.isEmpty() || lastSegment == null ||
lastSegment.getFileSize() + entryBytes.length > maxSegmentFileSize) {
// close file and rename
if (lastSegment != null) {
FileUtil.closeFile(lastSegment.getRandomAccessFile());
String newFileName = String.format("%016x-%016x",
lastSegment.getStartIndex(), lastSegment.getEndIndex());
String newFullFileName = logDir + File.separator + newFileName;
File newFile = new File(newFullFileName);
String oldFullFileName = logDir + File.separator + lastSegment.getFileName();
File oldFile = new File(oldFullFileName);
FileUtils.moveFile(oldFile, newFile);
segmentStartIndexMap.put(lastSegment.getStartIndex(), newFileName);
}
String segmentFileName = "open-" + String.format("%016x", newLastLogIndex);
lastSegment = new Segment(logDir, segmentFileName);
segmentStartIndexMap.put(newLastLogIndex, segmentFileName);
}
if (entry.getIndex() == 0) {
entry.setIndex(newLastLogIndex);
}
if (entry.getIndex() != newLastLogIndex) {
LOGGER.warn("received invalid log index: {}", entry.getIndex());
throw new RuntimeException("received invalid log index: {}" + entry.getIndex());
}
lastSegment.append(entry);
} catch (IOException e) {
e.printStackTrace();
}
return newLastLogIndex;
}
public long append(List<LogEntry> entries) {
long newLastLogIndex = this.getLastEntryIndex();
for (LogEntry entry : entries) {
newLastLogIndex = append(entry);
}
return newLastLogIndex;
}
public List<LogEntry> getEntriesStartAt(long logIndex) {
Map.Entry<Long, String> floorEntry = segmentStartIndexMap.floorEntry(logIndex);
if (floorEntry == null) {
LOGGER.warn("received invalid log index: {}", logIndex);
throw new RuntimeException("received invalid log index: {}" + logIndex);
}
SortedMap<Long, String> tailMap = segmentStartIndexMap.tailMap(floorEntry.getKey());
List<LogEntry> entries = new ArrayList<>();
tailMap.forEach((key, value) -> {
Segment segment = Segment.loadSegment(logDir, value);
entries.addAll(segment.getEntriesNotLessThan(logIndex));
segment.close();
});
return entries;
}
public void deleteEntriesStartAt(long logIndex) {
LOGGER.info("deleteEntry:" + logIndex);
Map.Entry<Long, String> floorEntry = segmentStartIndexMap.floorEntry(logIndex - 1);
long floorKey;
if (floorEntry == null) {
floorKey = -1;
lastSegment = null;
} else {
floorKey = floorEntry.getKey();
if (lastSegment.getStartIndex() != floorKey) {
lastSegment.close();
lastSegment = Segment.loadSegment(logDir, floorEntry.getValue());
lastSegment.setMode("rw");
}
lastSegment.deleteEntriesStartAt(logIndex);
segmentStartIndexMap.put(lastSegment.getStartIndex(), lastSegment.getFileName());
}
Map.Entry<Long, String> lastMapEntry = segmentStartIndexMap.lastEntry();
while (lastMapEntry != null && lastMapEntry.getKey() > floorKey) {
FileUtil.deleteFile(logDir, lastMapEntry.getValue());
segmentStartIndexMap.remove(lastMapEntry.getKey());
lastMapEntry = segmentStartIndexMap.lastEntry();
}
}
public void setMaxSegmentFileSize(int maxSegmentFileSize) {
this.maxSegmentFileSize = maxSegmentFileSize;
}
private void fillMap(File dir) {
if (dir == null) return;
File[] files = dir.listFiles();
if (files == null) {
LOGGER.warn("get file in dir({}) error", dir.getName());
return;
}
for (File file : files) {
if (file.isFile()) {
long startIndexInFile;
String fileName = file.getName();
String[] splitArray = fileName.split("-");
if (splitArray.length != 2) {
LOGGER.warn("segment filename[{}] is not valid", fileName);
throw new RuntimeException("segment filename[{}] is not valid" + fileName);
}
if (splitArray[0].equals("open")) {
startIndexInFile = Long.parseLong(splitArray[1], 16);
} else {
startIndexInFile = Long.parseLong(splitArray[0], 16);
}
segmentStartIndexMap.put(startIndexInFile, fileName);
}
}
}
}

View File

@ -0,0 +1,185 @@
package org.bdware.consistency.plugin.raft.algo.storage;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.raft.algo.util.FileUtil;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
public class Segment {
private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(Segment.class);
private long startIndex;
private long endIndex;
private long fileSize;
private String dir;
private String fileName;
private RandomAccessFile randomAccessFile;
private List<Record> records;
public void close() {
FileUtil.closeFile(randomAccessFile);
}
public void setMode(String mode) {
FileUtil.closeFile(randomAccessFile);
randomAccessFile = FileUtil.openFile(dir, fileName, mode);
}
public static class Record {
public long offset;
public LogEntry entry;
public Record(long offset, LogEntry entry) {
this.offset = offset;
this.entry = entry;
}
}
// Create new segment file
public Segment(String dir, String name) {
this.dir = dir;
fileName = name;
records = new ArrayList<>();
}
public Segment(String dir) {
this.dir = dir;
records = new ArrayList<>();
}
public LogEntry getEntry(long index) {
if (records.size() == 0 || index < startIndex || index > endIndex) {
return null;
}
int indexInList = (int) (index - startIndex);
return records.get(indexInList).entry;
}
public List<LogEntry> getEntriesNotLessThan(long index) {
List<LogEntry> entries = new ArrayList<>();
if (records.size() == 0 || index > endIndex) {
return entries;
}
int indexInList = Math.max(0, (int) (index - startIndex));
for (int i = indexInList; i < records.size(); i++) {
entries.add(records.get(i).entry);
}
return entries;
}
public void deleteEntriesStartAt(long index) {
if (records.size() == 0 || index > endIndex) {
return;
}
int indexInList = (int) (index - startIndex);
if (indexInList <= 0) {
records.clear();
FileUtil.closeFile(randomAccessFile);
FileUtil.deleteFile(dir, fileName);
fileSize = 0;
return;
}
endIndex = index - 1;
fileSize = records.get(indexInList).offset;
records.removeAll(records.subList(indexInList, records.size()));
FileUtil.truncate(randomAccessFile, fileSize);
FileUtil.closeFile(randomAccessFile);
String oldFullFileName = dir + File.separator + fileName;
fileName = String.format("open-%016x", startIndex);
String newFullFileName = dir + File.separator + fileName;
FileUtil.moveFile(oldFullFileName, newFullFileName);
randomAccessFile = FileUtil.openFile(dir, fileName, "rw");
}
public List<LogEntry> getEntries() {
return getEntriesNotLessThan(startIndex);
}
public LogEntry getLastEntry() {
return getEntry(endIndex);
}
public void append(LogEntry entry) {
if (randomAccessFile == null) {
randomAccessFile = FileUtil.createFile(dir, fileName, "rw");
}
if (records.isEmpty()) {
startIndex = entry.getIndex();
}
try {
records.add(new Record(randomAccessFile.getFilePointer(), entry));
FileUtil.writeLogEntryToFile(randomAccessFile, entry);
endIndex = entry.getIndex();
fileSize = randomAccessFile.length();
} catch (IOException e) {
e.printStackTrace();
}
}
public static Segment loadSegment(String dir, String fileName) {
Segment segment = new Segment(dir);
try {
String[] splitArray = fileName.split("-");
if (splitArray.length != 2) {
LOGGER.warn("segment filename[{}] is not valid", fileName);
throw new RuntimeException("segment filename[{}] is not valid" + fileName);
}
segment.fileName = fileName;
if (splitArray[0].equals("open")) {
segment.randomAccessFile = FileUtil.openFile(dir, fileName, "rw");
} else {
segment.randomAccessFile = FileUtil.openFile(dir, fileName, "r");
}
segment.fileSize = segment.randomAccessFile.length(); // ioException
long offset = 0;
while (offset < segment.fileSize) {
LogEntry entry = FileUtil.readLogEntryFromFile(segment.randomAccessFile);
if (entry == null) {
throw new RuntimeException("read segment log failed");
}
Record record = new Record(offset, entry);
segment.records.add(record);
offset = segment.randomAccessFile.getFilePointer(); // ioException
}
int entrySize = segment.records.size();
if (entrySize > 0) {
segment.startIndex = segment.getRecords().get(0).entry.getIndex();
segment.endIndex = segment.getRecords().get(entrySize - 1).entry.getIndex();
}
return segment;
} catch (IOException ioException) {
LOGGER.warn("loadSegment exception:", ioException);
throw new RuntimeException("loadSegment error" + ioException);
}
}
public long getStartIndex() {
return startIndex;
}
public long getEndIndex() {
return endIndex;
}
public long getFileSize() {
return fileSize;
}
public String getFileName() {
return fileName;
}
public RandomAccessFile getRandomAccessFile() {
return randomAccessFile;
}
public List<Record> getRecords() {
return records;
}
}

View File

@ -0,0 +1,26 @@
package org.bdware.consistency.plugin.raft.algo.storage;
import org.bdware.sc.units.PubKeyNode;
// todo
public class StateManager {
String filename;
static long currentTerm;
static PubKeyNode node;
public static long loadCurrentTerm() {
return currentTerm;
}
public static PubKeyNode loadVotedFor() {
return node;
}
public static void saveCurrentTerm(long currentTerm) {
StateManager.currentTerm = currentTerm;
}
public static void saveVotedFor(PubKeyNode node) {
StateManager.node = node;
}
}

View File

@ -0,0 +1,143 @@
package org.bdware.consistency.plugin.raft.algo.util;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.Logger;
import org.bdware.consistency.plugin.raft.algo.message.IRaftMessage;
import org.bdware.consistency.plugin.raft.algo.storage.LogEntry;
import org.bdware.consistency.plugin.raft.algo.storage.Segment;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Method;
import java.nio.channels.FileChannel;
import java.util.zip.CRC32;
public class FileUtil {
private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(FileUtil.class);
public static RandomAccessFile createFile(String dir, String fileName, String mode) {
try {
String fullFileName = dir + File.separator + fileName;
File file = new File(fullFileName);
file.createNewFile();
return new RandomAccessFile(file, mode);
} catch (FileNotFoundException e) {
LOGGER.warn("file not fount, file={}", fileName);
throw new RuntimeException("file not found, file=" + fileName);
} catch (IOException e) {
LOGGER.warn("create file error, msg={}", e.getMessage());
throw new RuntimeException("create file error, msg={}" + e.getMessage());
}
}
public static RandomAccessFile openFile(String dir, String fileName, String mode) {
try {
String fullFileName = dir + File.separator + fileName;
File file = new File(fullFileName);
return new RandomAccessFile(file, mode);
} catch (FileNotFoundException e) {
LOGGER.warn("file not fount, file={}", fileName);
throw new RuntimeException("file not found, file=" + fileName);
}
}
public static void closeFile(RandomAccessFile randomAccessFile) {
try {
if (randomAccessFile != null) {
randomAccessFile.close();
}
} catch (IOException e) {
LOGGER.warn("close file error, msg={}", e.getMessage());
throw new RuntimeException("close file error, msg={}" + e.getMessage());
}
}
public static void deleteFile(String dir, String fileName) {
String fullFileName = dir + File.separator + fileName;
File file = new File(fullFileName);
try {
FileUtils.forceDelete(file);
} catch (IOException e) {
LOGGER.warn("delete file error, msg={}", e.getMessage());
throw new RuntimeException("delete file error, msg={}" + e.getMessage());
}
}
public static void moveFile(String oldFullFileName, String newFullFileName) {
File oldFile = new File(oldFullFileName);
File newFile = new File(newFullFileName);
try {
FileUtils.moveFile(oldFile, newFile);
} catch (IOException e) {
LOGGER.warn("move file error, msg={}", e.getMessage());
throw new RuntimeException("move file error, msg={}" + e.getMessage());
}
}
public static void truncate(RandomAccessFile randomAccessFile, long size) {
FileChannel fileChannel = randomAccessFile.getChannel();
try {
fileChannel.truncate(size);
} catch (IOException e) {
LOGGER.warn("truncate file error, msg={}", e.getMessage());
throw new RuntimeException("truncate file error, msg={}" + e.getMessage());
} finally {
try {
fileChannel.close();
} catch (IOException e) {
LOGGER.warn("close fileChannel error, msg={}", e.getMessage());
throw new RuntimeException("close fileChannel error, msg={}" + e.getMessage());
}
}
}
public static LogEntry readLogEntryFromFile(RandomAccessFile raf) {
try {
long crc32FromFile = raf.readLong();
int dataLen = raf.readInt();
int hasReadLen = (Long.SIZE + Integer.SIZE) / Byte.SIZE;
if (raf.length() - hasReadLen < dataLen) {
LOGGER.warn("file remainLength < dataLen");
return null;
}
byte[] data = new byte[dataLen];
int readLen = raf.read(data);
if (readLen != dataLen) {
LOGGER.warn("readLen != dataLen");
return null;
}
long crc32FromData = getCRC32(data);
if (crc32FromFile != crc32FromData) {
LOGGER.warn("crc32 check failed");
return null;
}
return LogEntry.parse(data);
} catch (Exception ex) {
LOGGER.warn("readLogEntryFromFile meet exception, {}", ex.getMessage());
return null;
}
}
public static void writeLogEntryToFile(RandomAccessFile raf, LogEntry entry) {
byte[] bytes = entry.getBytes();
long crc32 = getCRC32(bytes);
try {
raf.writeLong(crc32);
raf.writeInt(bytes.length);
raf.write(bytes);
} catch (IOException ex) {
LOGGER.warn("write LogEntry to file error, msg={}", ex.getMessage());
throw new RuntimeException("write LogEntry to file error");
}
}
public static long getCRC32(byte[] data) {
CRC32 crc32 = new CRC32();
crc32.update(data);
return crc32.getValue();
}
}

View File

@ -0,0 +1,51 @@
package org.bdware.consistency.plugin.raft.algo.util;
import org.bdware.sc.ContractManager;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class HeartBeatUtil {
private static HeartBeatUtil instance;
Map<TimerTask, ScheduledFuture> recordedFuture;
private HeartBeatUtil() {
recordedFuture = new ConcurrentHashMap<>();
}
public static HeartBeatUtil getInstance() {
if (null == instance) {
instance = new HeartBeatUtil();
}
return instance;
}
public synchronized void cancel(TimerTask timerTask) {
try {
if (recordedFuture.containsKey(timerTask)) {
ScheduledFuture future = recordedFuture.get(timerTask);
future.cancel(false);
recordedFuture.remove(timerTask);
timerTask.cancel();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public synchronized void schedule(TimerTask timerTask, int delay, int period) {
try {
if (!recordedFuture.containsKey(timerTask)) {
ScheduledFuture<?> future =
ContractManager.scheduledThreadPool.scheduleWithFixedDelay(
timerTask, delay, period, TimeUnit.MILLISECONDS);
recordedFuture.put(timerTask, future);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,91 @@
package org.bdware.consistency.plugin.raft.algo.message;
import org.bdware.consistency.plugin.raft.algo.storage.LogEntry;
import org.bdware.sc.bean.ContractRequest;
import org.bdware.sc.units.PubKeyNode;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
public class MessageTest {
@Test
public void testRequestVote() {
RequestVote msg = new RequestVote();
msg.setTerm(98567);
PubKeyNode node = new PubKeyNode();
node.pubkey = "kjfhkefbviury";
msg.setCandidate(node);
msg.setLastLogTerm(8956737);
msg.setLastLogIndex(42956);
byte[] bytes = RaftMessageParser.setMessage(msg).getBytes();
RequestVote message = (RequestVote) RaftMessageParser.parse(bytes);
assert msg.getTerm() == message.getTerm();
assert msg.getCandidate().pubkey.equals(message.getCandidate().pubkey);
assert msg.getLastLogTerm() == message.getLastLogTerm();
assert msg.getLastLogIndex() == message.getLastLogIndex();
}
@Test
public void testRequestVoteResponse() {
RequestVoteResponse msg = new RequestVoteResponse();
msg.setTerm(4563657);
msg.setGranted(true);
byte[] bytes = RaftMessageParser.setMessage(msg).getBytes();
RequestVoteResponse message = (RequestVoteResponse) RaftMessageParser.parse(bytes);
assert msg.getTerm() == message.getTerm();
assert msg.isGranted() == message.isGranted();
}
@Test
public void testAppendEntries() {
AppendEntries msg = new AppendEntries();
msg.setTerm(353);
PubKeyNode node = new PubKeyNode();
node.pubkey = "ksuhgkjfjksdbfchj";
msg.setLeader(node);
msg.setPrevLogIndex(298476957);
msg.setPrevLogTerm(5467);
msg.setLeaderCommit(586793);
List<LogEntry> entries = new ArrayList<>();
for (int i = 0; i < 10; i++) {
LogEntry entry = new LogEntry();
entry.setTerm(i);
entry.setIndex(i * 100);
ContractRequest command = new ContractRequest();
command.setAction(String.format("%20d-%20d", i, i * 100));
entry.setCommand(command);
entries.add(entry);
}
msg.setEntries(entries);
byte[] bytes = RaftMessageParser.setMessage(msg).getBytes();
AppendEntries message = (AppendEntries) RaftMessageParser.parse(bytes);
assert msg.getTerm() == message.getTerm();
assert msg.getLeader().pubkey.equals(message.getLeader().pubkey);
assert msg.getPrevLogIndex() == message.getPrevLogIndex();
assert msg.getPrevLogTerm() == message.getPrevLogTerm();
assert msg.getLeaderCommit() == message.getLeaderCommit();
assert msg.getEntries().size() == message.getEntries().size();
assert msg.getEntries().get(0).getCommand().getAction().equals(message.getEntries().get(0).getCommand().getAction());
assert msg.getEntries().get(9).getCommand().getAction().equals(message.getEntries().get(9).getCommand().getAction());
}
@Test
public void testAppendEntriesResponse() {
AppendEntriesResponse msg = new AppendEntriesResponse();
msg.setTerm(5349679);
msg.setSuccess(true);
byte[] bytes = RaftMessageParser.setMessage(msg).getBytes();
AppendEntriesResponse message = (AppendEntriesResponse) RaftMessageParser.parse(bytes);
assert msg.getTerm() == message.getTerm();
assert msg.isSuccess() == message.isSuccess();
}
}

View File

@ -0,0 +1,66 @@
package org.bdware.consistency.plugin.raft.algo.storage;
import org.bdware.sc.bean.ContractRequest;
import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
public class LogManagerTest {
@Test
public void testLogManager() {
LogManager logManager = new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000);
List<LogEntry> entries = new ArrayList<>();
LogEntry entry;
for (int i = 0; i < 1000; i++) {
entry = new LogEntry();
entry.setTerm(i / 10);
entry.setIndex(i);
ContractRequest command = new ContractRequest();
command.setAction(String.format("%20d-%20d", i, i * 100));
entry.setCommand(command);
entries.add(entry);
}
long idx = logManager.append(entries);
assert idx == 999;
}
@Test
public void testLogManager2() {
LogManager logManager = new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000);
LogEntry entry = logManager.getEntry(386);
assert entry.getTerm() == 38;
assert entry.getIndex() == 386;
assert entry.getCommand().getAction().equals(String.format("%20d-%20d", 386, 386 * 100));
entry = logManager.getLastEntry();
assert entry.getTerm() == 99;
assert entry.getIndex() == 999;
assert entry.getCommand().getAction().equals(String.format("%20d-%20d", 999, 999 * 100));
entry = logManager.getEntry(0);
assert entry.getTerm() == 0;
assert entry.getIndex() == 0;
assert entry.getCommand().getAction().equals(String.format("%20d-%20d", 0, 0));
List<LogEntry> entries = new ArrayList<>();
for (int i = 1000; i < 1100; i++) {
entry = new LogEntry();
entry.setTerm(i / 10);
entry.setIndex(i);
ContractRequest command = new ContractRequest();
command.setAction(String.format("%20d-%20d", i, i * 100));
entry.setCommand(command);
entries.add(entry);
}
long index = logManager.append(entries);
assert index == 1099;
}
@Test
public void testLogManager3() {
LogManager logManager = new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000);
List<LogEntry> entryList = logManager.getEntriesStartAt(677);
assert entryList.size() == 1100 - 677;
assert entryList.get(100).getIndex() == 777;
logManager.deleteEntriesStartAt(983);
assert logManager.getLastEntryIndex() == 982;
}
}