diff --git a/src/main/java/org/bdware/consistency/plugin/pbft/PBFTExecutor.java b/src/main/java/org/bdware/consistency/plugin/pbft/PBFTExecutor.java index df1687a..48a4d0d 100644 --- a/src/main/java/org/bdware/consistency/plugin/pbft/PBFTExecutor.java +++ b/src/main/java/org/bdware/consistency/plugin/pbft/PBFTExecutor.java @@ -171,7 +171,6 @@ public class PBFTExecutor extends AbstractContextContractExecutor { return; } req.setContractID(cmActions.getManager().getContractIDByName(req.getContractID())); - // 三个相同requestID进来的时候,会有冲突。 // 仅在此处有冲突么? // 这里是从MasterServer->MasterClient,请求的是"executeContractLocally"。 diff --git a/src/main/java/org/bdware/consistency/plugin/ra/RequestAllExecutor.java b/src/main/java/org/bdware/consistency/plugin/ra/RequestAllExecutor.java index 7a5af99..2aabfd4 100644 --- a/src/main/java/org/bdware/consistency/plugin/ra/RequestAllExecutor.java +++ b/src/main/java/org/bdware/consistency/plugin/ra/RequestAllExecutor.java @@ -218,7 +218,7 @@ public class RequestAllExecutor extends AbstractContextContractExecutor { ResultCallback originalCallback; Set nodeIDs = new HashSet<>(); // 已收到返回结果的节点 - ResultMerger( + public ResultMerger( final ResultCallback originalCb, final int count, final int request_seq, diff --git a/src/main/java/org/bdware/consistency/plugin/raft/RAFTExecutor.java b/src/main/java/org/bdware/consistency/plugin/raft/RAFTExecutor.java new file mode 100644 index 0000000..c3f8032 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/RAFTExecutor.java @@ -0,0 +1,111 @@ +package org.bdware.consistency.plugin.raft; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.consistency.plugin.common.AbstractContextContractExecutor; +import org.bdware.consistency.plugin.pbft.ContractCluster; +import org.bdware.consistency.plugin.ra.RequestAllExecutor; +import org.bdware.consistency.plugin.raft.algo.RaftAlgorithm; +import org.bdware.consistency.plugin.raft.algo.RaftConfig; +import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.conn.Node; +import org.bdware.sc.conn.OnHashCallback; +import org.bdware.sc.conn.ResultCallback; +import org.bdware.sc.consistency.Committer; +import org.bdware.sc.units.MultiContractMeta; +import org.bdware.sc.units.PubKeyNode; +import org.bdware.sc.util.JsonUtil; +import org.zz.gmhelper.SM2KeyPair; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class RAFTExecutor extends AbstractContextContractExecutor { + private static final Logger LOGGER = LogManager.getLogger(RAFTExecutor.class); + private final ContractCluster contractCluster; + RaftAlgorithm raft; + + public RAFTExecutor(String contractID, String[] memberPubKeys) { + + RaftConfig config = new RaftConfig(); + PubKeyNode selfNode = new PubKeyNode(); + selfNode.pubkey = globalConf.getNodeID(); + List members = new ArrayList<>(); + for (String memberPubKey : memberPubKeys) { + PubKeyNode pubkeyNode = new PubKeyNode(); + pubkeyNode.pubkey = memberPubKey; + members.add(pubkeyNode); + } + contractCluster = new ContractCluster(contractID, members); + final MultiContractMeta cei = cmActions.getManager().multiContractRecorder.getMultiContractMeta(contractID); + raft = new RaftAlgorithm(config, selfNode, contractCluster, new Committer() { + @Override + public void onCommit(ContractRequest request) { + ResultCallback ret = null; + final long startTime = System.currentTimeMillis(); + ret = new ResultCallback() { + @Override + public void onResult(String str) { + Map ret = new HashMap<>(); + ret.put("action", "receiveTrustfullyResult"); + SM2KeyPair keyPair = globalConf.getKeyPair(); + ret.put("nodeID", keyPair.getPublicKeyStr()); + ret.put("responseID", request.getRequestID()); + ret.put("executeTime", (System.currentTimeMillis() - startTime) + ""); + ret.put("data", str); + cei.setLastExeSeq(request.seq); + networkManager.sendToAgent(raft.getLeader().pubkey, JsonUtil.toJson(ret)); + } + }; + cmActions.getManager().executeLocallyAsync(request, ret, null); + } + }); + + //TODO 在改变角色时需要通知node cluster + if (cei.isMaster()) { + raft.convertToLeader(); + } else { + raft.convertToFollower(); + } + } + + @Override + public void execute(String requestID, ContractRequest req, ResultCallback rcb, OnHashCallback hcb) { + LOGGER.debug(JsonUtil.toJson(req)); + MultiContractMeta multiContractMeta = cmActions.getManager().multiContractRecorder.getMultiContractMeta(req.getContractID()); + if (multiContractMeta == null || !multiContractMeta.isMaster()) { + cmActions.getManager().executeContractOnOtherNodes(req, rcb); + return; + } + + ResultCallback collector; + int count = contractCluster.getNodes().size(); + collector = networkManager.createResultCallback( + requestID, new RequestAllExecutor.ResultMerger(rcb, count, req.seq, req.getContractID()), count); + masterServerTCPAction.getSync().sleep(requestID, collector); + raft.insertLogEntry(req); + } + + + @Override + public void onRecover(Map args) { + super.onRecover(args); + } + + @Override + public void onDeliverBlock(String data) { + super.onDeliverBlock(data); + } + + @Override + public void close() { + super.close(); + } + + @Override + public void onSyncMessage(Node node, byte[] data) { + raft.onMessage(node, data); + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/RAFTExecutorFactory.java b/src/main/java/org/bdware/consistency/plugin/raft/RAFTExecutorFactory.java new file mode 100644 index 0000000..7b9a37d --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/RAFTExecutorFactory.java @@ -0,0 +1,22 @@ +package org.bdware.consistency.plugin.raft; + +import org.bdware.consistency.plugin.pbft.PBFTExecutor; +import org.bdware.sdk.consistency.api.ContractExecutorFactory; +import org.bdware.server.trustedmodel.ContractExecutor; + +import java.util.Map; + +public class RAFTExecutorFactory implements ContractExecutorFactory { + @Override + public String getExecutorName() { + return "RAFT"; + } + + @Override + public ContractExecutor getInstance(Map args) { +// int nodeSize = (int) args.get("nodeSize"); + String contractID = (String) args.get("contractID"); + String[] memberPubKeys = (String[]) args.get("members"); + return new RAFTExecutor(contractID, memberPubKeys); + } +} \ No newline at end of file diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/RaftAlgorithm.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/RaftAlgorithm.java new file mode 100644 index 0000000..65a6564 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/RaftAlgorithm.java @@ -0,0 +1,413 @@ +package org.bdware.consistency.plugin.raft.algo; + +import org.apache.logging.log4j.Logger; +import org.bdware.consistency.plugin.pbft.ContractCluster; +import org.bdware.consistency.plugin.raft.algo.message.*; +import org.bdware.consistency.plugin.raft.algo.storage.LogEntry; +import org.bdware.consistency.plugin.raft.algo.storage.LogManager; +import org.bdware.consistency.plugin.raft.algo.storage.StateManager; +import org.bdware.consistency.plugin.raft.algo.util.HeartBeatUtil; +import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.conn.Node; +import org.bdware.sc.consistency.CommitAlgorithm; +import org.bdware.sc.consistency.Committer; +import org.bdware.sc.units.PubKeyNode; +import org.bdware.sc.units.TrustfulExecutorConnection; + +import java.util.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class RaftAlgorithm implements CommitAlgorithm { + private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(RaftAlgorithm.class); + private ContractCluster connection; + Committer committer; + + private enum State {LEADER, FOLLOWER, CANDIDATE} + + State state; + PubKeyNode leader; + final PubKeyNode self; + List servers; + + StateManager stateManager; + // Persistent state on all servers + long currentTerm; + PubKeyNode votedFor; + LogManager logManager; + + // Volatile state on all servers + long commitIndex; + long lastApplied; + + // Volatile state on leaders + Map nextIndex; + Map matchIndex; + + + int grantedVoteCount; + + // electionTimeout + TimerTask electionTimerTask; + TimerTask heartbeatTimerTask; + long lastActivated; + + //config + RaftConfig config; + + // Locks and Conditions + private Lock lock = new ReentrantLock(); + + + public RaftAlgorithm(RaftConfig config, PubKeyNode node, ContractCluster conn, Committer committer) { + this.config = config; + self = node; + connection = conn; + this.committer = committer; + state = State.FOLLOWER; + leader = null; + servers = connection.getNodes(); + currentTerm = StateManager.loadCurrentTerm(); + votedFor = StateManager.loadVotedFor(); + logManager = new LogManager(config.raftLogDir, config.maxSegmentFileSize); + commitIndex = 0; + lastApplied = 0; + + electionTimerTask = new TimerTask() { + @Override + public void run() { + long cur = System.currentTimeMillis(); + if (cur - lastActivated > config.getElectionTimeout()) { + startElection(); + } + } + }; + heartbeatTimerTask = new TimerTask() { + @Override + public void run() { + lastActivated = System.currentTimeMillis(); + broadcastAppendEntries(); + } + }; + resetElectionTimer(); + } + + + public boolean isLeader() { + return state == State.LEADER; + } + + @Override + public void onMessage(Node sender, byte[] msg) { + if (((PubKeyNode) sender).pubkey.equals(self.pubkey)) { + LOGGER.info("IGNORE SELF:" + msg.length); + return; + } + LOGGER.info("RECEIVE:" + msg.length); + if (!isServerValid((PubKeyNode) sender)) { + LOGGER.debug("message from an invalid server ({})!", sender.toString()); + return; + } + IRaftMessage message = RaftMessageParser.parse(msg); + LOGGER.info("RECEIVE:" + message.getType()); + lock.lock(); + try { + IRaftMessage response; + switch (message.getType()) { + case RequestVote: + response = onRequestVote((RequestVote) message); + persist(); + connection.sendMessage((PubKeyNode) sender, new RaftMessageParser(response).getBytes()); + break; + case RequestVoteResponse: + onRequestVoteResponse((RequestVoteResponse) message); + break; + case AppendEntries: + response = onAppendEntries((AppendEntries) message); + persist(); + connection.sendMessage((PubKeyNode) sender, new RaftMessageParser(response).getBytes()); + break; + case AppendEntriesResponse: + onAppendEntriesResponse((PubKeyNode) sender, (AppendEntriesResponse) message); + break; + case Unknown: + break; + default: + throw new IllegalStateException("Unexpected value: " + message.getType()); + } + } finally { + lock.unlock(); + } + } + + // in lock + private RequestVoteResponse onRequestVote(RequestVote request) { + if (request.getTerm() < currentTerm) { + LOGGER.debug("Rejected RequestVote message in term {}" + + ", because the message is stale. My term is {}.", request.getTerm(), currentTerm); + return new RequestVoteResponse(currentTerm, false); + } + if (request.getTerm() == currentTerm) { + if (votedFor != null && !votedFor.pubkey.equals(request.getCandidate().pubkey)) { + LOGGER.debug("Rejected RequestVote message in term {}" + + ", because I have voted for another server. ", request.getTerm()); + return new RequestVoteResponse(currentTerm, false); + } + } else { + currentTerm = request.getTerm(); + leader = null; + votedFor = null; + convertToFollower(); + } + if (!isUpToDateThanMyLog(request.getLastLogTerm(), request.getLastLogIndex())) { + LOGGER.debug("Rejected RequestVote message, because my log is more up-to-date. "); + return new RequestVoteResponse(currentTerm, false); + } + // (request.getTerm() == currentTerm && votedFor == null) || request.getTerm() > currentTerm + // || request server's log is more up-to-date than mine + votedFor = request.getCandidate(); + LOGGER.info("Granted RequestVote message from server (pubKey={}) in term {}", + request.getCandidate().pubkey, currentTerm); + + return new RequestVoteResponse(currentTerm, true); + } + + // in lock + private void onRequestVoteResponse(RequestVoteResponse response) { + if (response.getTerm() > currentTerm) { + currentTerm = response.getTerm(); + leader = null; + votedFor = null; + convertToFollower(); + } else if (state == State.CANDIDATE && response.getTerm() == currentTerm && response.isGranted()) { + ++grantedVoteCount; + if (grantedVoteCount > servers.size() / 2) { + convertToLeader(); + LOGGER.info("Get votes from a majority. Convert to leader."); + } + } + } + + // in lock + private AppendEntriesResponse onAppendEntries(AppendEntries request) { + LogEntry lastEntry = logManager.getLastEntry(); + if (request.getTerm() < currentTerm) { + LOGGER.debug("Rejected AppendEntries message in term {}" + + ", because the message is stale. My term is {}.", request.getTerm(), currentTerm); + return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), false, self); + } + if (request.getTerm() > currentTerm) { + currentTerm = request.getTerm(); + leader = null; + votedFor = null; + convertToFollower(); + } + if (leader == null) { + leader = request.getLeader(); + LOGGER.info("new leader={}", leader.pubkey); + } + if (!leader.pubkey.equals(request.getLeader().pubkey)) { + LOGGER.warn("Another peer={} declares that it is the leader " + + "at term={} which was occupied by leader={}", + request.getLeader().pubkey, request.getTerm(), leader.pubkey); + return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), false, self); + } + resetElectionTimer(); + lastActivated = System.currentTimeMillis(); + LogEntry entry = logManager.getEntry(request.getPrevLogIndex()); + if (entry == null) { + return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), false, self); + } + if (entry.getTerm() != request.getTerm() || logManager.getLastEntryIndex() > request.getPrevLogIndex()) { + logManager.deleteEntriesStartAt(request.getPrevLogIndex()); + lastEntry = logManager.getLastEntry(); + } + List newEntries = request.getEntries(); + if (newEntries.size() == 0) { + return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), true, self); + } + logManager.append(newEntries); + lastEntry = logManager.getLastEntry(); + if (request.getLeaderCommit() > commitIndex) { + commitIndex = Math.min(request.getLeaderCommit(), newEntries.get(newEntries.size() - 1).getIndex()); + } + LOGGER.info("RECEIVE APPENDENTRY: commitIndex=" + commitIndex + " lastApplied:" + lastApplied); + while (commitIndex > lastApplied) { + applyToStateMachine(lastApplied + 1); + lastApplied++; + } + return new AppendEntriesResponse(currentTerm, lastEntry.getIndex(), true, self); + } + + // in lock + private void onAppendEntriesResponse(PubKeyNode sender, AppendEntriesResponse response) { + if (response.getTerm() > currentTerm) { + currentTerm = response.getTerm(); + leader = null; + votedFor = null; + convertToFollower(); + } else if (state == State.LEADER && response.getTerm() == currentTerm) { + if (response.isSuccess()) { + if (matchIndex.containsKey(sender.pubkey) && matchIndex.get(sender.pubkey) > response.getLastLogIndex()) { + LOGGER.warn("node={} matchIndex decrease! prev matchIndex={}, lastLogIndex={}", + sender.pubkey, matchIndex.get(sender.pubkey), response.getLastLogIndex()); + throw new RuntimeException("matchIndex error"); + } + matchIndex.put(sender.pubkey, response.getLastLogIndex()); + } + nextIndex.put(sender.pubkey, response.getLastLogIndex() + 1); + } + } + + // todo + private void applyToStateMachine(long logIndex) { + LogEntry entry = logManager.getEntry(logIndex); + LOGGER.info("Commit: requestID = " + entry.getCommand().getRequestID()); + committer.onCommit(entry.getCommand()); + } + + public void insertLogEntry(ContractRequest req) { + LogEntry entry = new LogEntry(); + entry.setCommand(req); + entry.setIndex(logManager.getLastEntryIndex() + 1); + entry.setTerm(currentTerm); + logManager.append(entry); + broadcastAppendEntries(); + } + + private void broadcastAppendEntries() { + lock.lock(); + try { + LogEntry lastEntry = logManager.getLastEntry(); + long lastLogIndex = lastEntry.getIndex(); + nextIndex.forEach((pubKey, index) -> { + PubKeyNode node = new PubKeyNode(); + node.pubkey = pubKey; + if (lastLogIndex >= index) { + sendAppendEntries(node, index); + } else { + AppendEntries req = new AppendEntries(currentTerm, self, lastEntry.getIndex(), lastEntry.getTerm(), commitIndex, null); + connection.sendMessage(node, new RaftMessageParser(req).getBytes()); + } + }); + List indexList = new ArrayList<>(matchIndex.values()); + Collections.sort(indexList); + + long majorityIndex = indexList.get((indexList.size() - 1) / 2); + LogEntry entry = logManager.getEntry(majorityIndex); + if (majorityIndex > commitIndex && entry.getTerm() == currentTerm) { + commitIndex = majorityIndex; + } + while (commitIndex > lastApplied) { + applyToStateMachine(lastApplied + 1); + lastApplied++; + } + } finally { + lock.unlock(); + } + } + + private void sendAppendEntries(PubKeyNode follower, long logIndex) { + LogEntry prevLog = logManager.getEntry(logIndex - 1); + List entries = logManager.getEntriesStartAt(logIndex); + AppendEntries request = new AppendEntries(currentTerm, leader, prevLog.getIndex(), prevLog.getTerm(), commitIndex, entries); + connection.sendMessage(follower, new RaftMessageParser(request).getBytes()); + } + + private void persist() { + StateManager.saveCurrentTerm(currentTerm); + StateManager.saveVotedFor(votedFor); + } + + public void convertToFollower() { + if (state == State.FOLLOWER) { + return; + } + state = State.FOLLOWER; + stopHeartbeat(); + } + + // in lock + public void convertToLeader() { + state = State.LEADER; + leader = self; + nextIndex = new HashMap<>(); + matchIndex = new HashMap<>(); + startHeartbeat(); + LogEntry lastEntry = logManager.getLastEntry(); + for (PubKeyNode server : servers) { + if (server != null) { + if (lastEntry == null) + nextIndex.put(server.pubkey, 1L); + else + nextIndex.put(server.pubkey, lastEntry.getIndex() + 1); + matchIndex.put(server.pubkey, 0L); + // AppendEntries req = new AppendEntries(currentTerm, self, lastEntry.getIndex(), lastEntry.getTerm(), commitIndex, null); + // connection.sendMessage(server,new RaftMessageParser(req).getBytes()); + } + } + } + + private void stopHeartbeat() { + HeartBeatUtil.getInstance().cancel(heartbeatTimerTask); + } + + private void startHeartbeat() { + HeartBeatUtil.getInstance().schedule(heartbeatTimerTask, 0, config.heartbeatTimeout); + } + + private void startElection() { + RequestVote request; + lock.lock(); + try { + ++currentTerm; + state = State.CANDIDATE; + votedFor = self; + grantedVoteCount = 1; + LogEntry lastLog = logManager.getLastEntry(); + request = new RequestVote(currentTerm, self, lastLog.getIndex(), lastLog.getTerm()); + } finally { + lock.unlock(); + } + connection.broadcast(request.getBytes()); + } + + private void resetElectionTimer() { + HeartBeatUtil.getInstance().cancel(electionTimerTask); + HeartBeatUtil.getInstance().schedule(electionTimerTask, config.getElectionTimeout(), 1000); + } + + + private boolean isServerValid(PubKeyNode server) { + if (server == null) return false; + List nodes = connection.getNodes(); + for (PubKeyNode node : nodes) { + if (node.pubkey.equals(server.pubkey)) { + return true; + } + } + return false; + } + + private boolean isUpToDateThanMyLog(long lastLogTerm, long lastLogIndex) { + LogEntry myLastLog = logManager.getLastEntry(); + long myLastLogIndex = myLastLog.getIndex(); + long myLastLogTerm = myLastLog.getTerm(); + return lastLogTerm > myLastLogTerm + || (lastLogTerm == myLastLogTerm && lastLogIndex >= myLastLogIndex); + } + + @Override + public void setCommitter(Committer c) { + committer = c; + } + + @Override + public void setConnection(TrustfulExecutorConnection c) { + connection = (ContractCluster) c; + } + + public PubKeyNode getLeader() { + return leader; + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/RaftConfig.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/RaftConfig.java new file mode 100644 index 0000000..cc9da98 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/RaftConfig.java @@ -0,0 +1,12 @@ +package org.bdware.consistency.plugin.raft.algo; + +public class RaftConfig { + public String raftLogDir = "./ContractDB/RAFTAll/"; + public int maxSegmentFileSize = 100000000; + private int electionTimeout = 90000; // unit: millisecond + public int heartbeatTimeout = 30000; // unit: millisecond + + public int getElectionTimeout() { + return electionTimeout; + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/message/AppendEntries.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/AppendEntries.java new file mode 100644 index 0000000..f7608a9 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/AppendEntries.java @@ -0,0 +1,146 @@ +package org.bdware.consistency.plugin.raft.algo.message; + +import org.bdware.consistency.plugin.raft.algo.storage.LogEntry; +import org.bdware.sc.conn.ByteUtil; +import org.bdware.sc.units.PubKeyNode; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class AppendEntries implements IRaftMessage { + RaftMessageType type; + long term; // leader's term + PubKeyNode leader; // leader's id + long prevLogIndex; + long prevLogTerm; + long leaderCommit; // leader's commitIndex + List entries; + + public AppendEntries() { + this.type = RaftMessageType.AppendEntries; + } + + public AppendEntries(long term, PubKeyNode leader, long prevLogIndex, long prevLogTerm, long leaderCommit, List entries) { + this.type = RaftMessageType.AppendEntries; + this.term = term; + this.leader = leader; + this.prevLogIndex = prevLogIndex; + this.prevLogTerm = prevLogTerm; + this.leaderCommit = leaderCommit; + this.entries = entries; + } + + @Override + public byte[] getBytes() { + ByteArrayOutputStream msg = new ByteArrayOutputStream(); + try { + ByteUtil.writeLong(msg, term); + if (leader != null && !leader.pubkey.equals("")) { + byte[] pubkey = leader.pubkey.getBytes(); + ByteUtil.writeInt(msg, pubkey.length); + msg.write(pubkey); + } else { + ByteUtil.writeInt(msg, 0); + } + ByteUtil.writeLong(msg, prevLogIndex); + ByteUtil.writeLong(msg, prevLogTerm); + ByteUtil.writeLong(msg, leaderCommit); + if (entries != null) { + ByteUtil.writeInt(msg, entries.size()); + for (LogEntry entry : entries) { + byte[] entryBytes = entry.getBytes(); + ByteUtil.writeInt(msg, entryBytes.length); + msg.write(entryBytes); + } + } else { + ByteUtil.writeInt(msg, 0); + } + } catch (IOException e) { + e.printStackTrace(); + } + return msg.toByteArray(); + } + + @Override + public IRaftMessage parse(byte[] b) { + AppendEntries msg = new AppendEntries(); + ByteArrayInputStream bi = new ByteArrayInputStream(b); + msg.term = ByteUtil.readLong(bi); + int leaderLen = ByteUtil.readInt(bi); + PubKeyNode node = new PubKeyNode(); + if (leaderLen > 0) { + node.pubkey = new String(ByteUtil.readBytes(bi, leaderLen)); + } + msg.leader = node; + msg.prevLogIndex = ByteUtil.readLong(bi); + msg.prevLogTerm = ByteUtil.readLong(bi); + msg.leaderCommit = ByteUtil.readLong(bi); + int entriesLen = ByteUtil.readInt(bi); + List list = new ArrayList<>(); + if (entriesLen > 0) { + for (int i = 0; i < entriesLen; i++) { + int len = ByteUtil.readInt(bi); + byte[] bytes = ByteUtil.readBytes(bi, len); + LogEntry entry = LogEntry.parse(bytes); + list.add(entry); + } + } + msg.entries = list; + return msg; + } + + public RaftMessageType getType() { + return type; + } + + public long getTerm() { + return term; + } + + public void setTerm(long term) { + this.term = term; + } + + public PubKeyNode getLeader() { + return leader; + } + + public void setLeader(PubKeyNode leader) { + this.leader = leader; + } + + public long getPrevLogIndex() { + return prevLogIndex; + } + + public void setPrevLogIndex(long prevLogIndex) { + this.prevLogIndex = prevLogIndex; + } + + public long getPrevLogTerm() { + return prevLogTerm; + } + + public void setPrevLogTerm(long prevLogTerm) { + this.prevLogTerm = prevLogTerm; + } + + public long getLeaderCommit() { + return leaderCommit; + } + + public void setLeaderCommit(long leaderCommit) { + this.leaderCommit = leaderCommit; + } + + public List getEntries() { + return entries; + } + + public void setEntries(List entries) { + this.entries = entries; + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/message/AppendEntriesResponse.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/AppendEntriesResponse.java new file mode 100644 index 0000000..e2c07fc --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/AppendEntriesResponse.java @@ -0,0 +1,103 @@ +package org.bdware.consistency.plugin.raft.algo.message; + +import org.bdware.sc.conn.ByteUtil; +import org.bdware.sc.units.PubKeyNode; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + + +public class AppendEntriesResponse implements IRaftMessage { + RaftMessageType type; + long term; + long lastLogIndex; + boolean success; + + PubKeyNode server; + + public AppendEntriesResponse() { + type = RaftMessageType.AppendEntriesResponse; + } + + public AppendEntriesResponse(long term, long lastLogIndex, boolean success, PubKeyNode server) { + type = RaftMessageType.AppendEntriesResponse; + this.term = term; + this.lastLogIndex = lastLogIndex; + this.success = success; + this.server = server; + } + + @Override + public byte[] getBytes() { + ByteArrayOutputStream msg = new ByteArrayOutputStream(); + ByteUtil.writeLong(msg, term); + ByteUtil.writeLong(msg, lastLogIndex); + ByteUtil.writeBoolean(msg, success); + try { + if (server != null && !server.pubkey.equals("")) { + byte[] pubkey = server.pubkey.getBytes(); + ByteUtil.writeInt(msg, pubkey.length); + msg.write(pubkey); + } else { + ByteUtil.writeInt(msg, 0); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return msg.toByteArray(); + } + + @Override + public IRaftMessage parse(byte[] b) { + AppendEntriesResponse msg = new AppendEntriesResponse(); + ByteArrayInputStream bi = new ByteArrayInputStream(b); + msg.term = ByteUtil.readLong(bi); + msg.lastLogIndex = ByteUtil.readLong(bi); + msg.success = ByteUtil.readBoolean(bi); + int serverLen = ByteUtil.readInt(bi); + PubKeyNode node = new PubKeyNode(); + if (serverLen > 0) { + node.pubkey = new String(ByteUtil.readBytes(bi, serverLen)); + } + msg.server = node; + return msg; + } + + @Override + public RaftMessageType getType() { + return type; + } + + public long getTerm() { + return term; + } + + public void setTerm(long term) { + this.term = term; + } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public PubKeyNode getServer() { + return server; + } + + public void setServer(PubKeyNode server) { + this.server = server; + } + + public long getLastLogIndex() { + return lastLogIndex; + } + + public void setLastLogIndex(long lastLogIndex) { + this.lastLogIndex = lastLogIndex; + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/message/EmptyMessage.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/EmptyMessage.java new file mode 100644 index 0000000..2ade4c1 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/EmptyMessage.java @@ -0,0 +1,28 @@ +package org.bdware.consistency.plugin.raft.algo.message; + +public class EmptyMessage implements IRaftMessage{ + RaftMessageType type; + + public EmptyMessage() { + type=RaftMessageType.Unknown; + } + + @Override + public byte[] getBytes() { + return new byte[0]; + } + + @Override + public IRaftMessage parse(byte[] b) { + return new EmptyMessage(); + } + + @Override + public RaftMessageType getType() { + return type; + } + + public void setType(RaftMessageType type) { + this.type = type; + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/message/IRaftMessage.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/IRaftMessage.java new file mode 100644 index 0000000..3894c5e --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/IRaftMessage.java @@ -0,0 +1,10 @@ +package org.bdware.consistency.plugin.raft.algo.message; + +public interface IRaftMessage { + + RaftMessageType getType(); + + byte[] getBytes(); + + IRaftMessage parse(byte[] b); +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/message/RaftMessageParser.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/RaftMessageParser.java new file mode 100644 index 0000000..fd728b2 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/RaftMessageParser.java @@ -0,0 +1,61 @@ +package org.bdware.consistency.plugin.raft.algo.message; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bdware.sc.conn.ByteUtil; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public class RaftMessageParser { + IRaftMessage message; + static Logger LOGGER = LogManager.getLogger(RaftMessageParser.class); + + public RaftMessageParser(IRaftMessage message) { + this.message = message; + } + + public byte[] getBytes() { + ByteArrayOutputStream bo = new ByteArrayOutputStream(); + try { + ByteUtil.writeInt(bo, message.getType().toInt()); + byte[] bytes = message.getBytes(); + ByteUtil.writeInt(bo, bytes.length); + bo.write(bytes); + LOGGER.info("[SEND] " + message.getType()); + } catch (IOException e) { + e.printStackTrace(); + } + return bo.toByteArray(); + } + + public static IRaftMessage parse(byte[] b) { + ByteArrayInputStream bi = new ByteArrayInputStream(b); + RaftMessageType type = RaftMessageType.fromInt(ByteUtil.readInt(bi)); + int msgLen = ByteUtil.readInt(bi); + byte[] messageBytes = ByteUtil.readBytes(bi, msgLen); + IRaftMessage msg; + switch (type) { + case RequestVote: + msg = new RequestVote(); + break; + case RequestVoteResponse: + msg = new RequestVoteResponse(); + break; + case AppendEntries: + msg = new AppendEntries(); + break; + case AppendEntriesResponse: + msg = new AppendEntriesResponse(); + break; + default: + msg = new EmptyMessage(); + } + return msg.parse(messageBytes); + } + + public static RaftMessageParser setMessage(IRaftMessage message) { + return new RaftMessageParser(message); + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/message/RaftMessageType.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/RaftMessageType.java new file mode 100644 index 0000000..af0d17f --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/RaftMessageType.java @@ -0,0 +1,35 @@ +package org.bdware.consistency.plugin.raft.algo.message; + +public enum RaftMessageType { + RequestVote(0), + RequestVoteResponse(1), + AppendEntries(2), + AppendEntriesResponse(3), + Unknown(4); + + private final int type; + + RaftMessageType(int i) { + type = i; + } + + public static RaftMessageType fromInt(int i) { + switch (i) { + case 0: + return RequestVote; + case 1: + return RequestVoteResponse; + case 2: + return AppendEntries; + case 3: + return AppendEntriesResponse; + default: + return Unknown; + } + } + + public int toInt() { + return type; + } + +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/message/RequestVote.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/RequestVote.java new file mode 100644 index 0000000..82cb511 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/RequestVote.java @@ -0,0 +1,101 @@ +package org.bdware.consistency.plugin.raft.algo.message; + +import org.bdware.sc.conn.ByteUtil; +import org.bdware.sc.units.PubKeyNode; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public class RequestVote implements IRaftMessage { + RaftMessageType type; + long term; + PubKeyNode candidate; // candidate's Id + long lastLogIndex; + long lastLogTerm; + + public RequestVote() { + type = RaftMessageType.RequestVote; + } + + public RequestVote(long term, PubKeyNode candidate, long lastLogIndex, long lastLogTerm) { + type = RaftMessageType.RequestVote; + this.term = term; + this.candidate = candidate; + this.lastLogIndex = lastLogIndex; + this.lastLogTerm = lastLogTerm; + } + + @Override + public byte[] getBytes() { + ByteArrayOutputStream msg = new ByteArrayOutputStream(); + try { + ByteUtil.writeLong(msg, term); + if (candidate != null && !candidate.pubkey.equals("")) { + byte[] pubkey = candidate.pubkey.getBytes(); + ByteUtil.writeInt(msg, pubkey.length); + msg.write(pubkey); + } else { + ByteUtil.writeInt(msg, 0); + } + ByteUtil.writeLong(msg, lastLogIndex); + ByteUtil.writeLong(msg, lastLogTerm); + } catch (IOException e) { + e.printStackTrace(); + } + return msg.toByteArray(); + } + + @Override + public IRaftMessage parse(byte[] b) { + RequestVote msg = new RequestVote(); + ByteArrayInputStream bi = new ByteArrayInputStream(b); + msg.term = ByteUtil.readLong(bi); + int candidateLen = ByteUtil.readInt(bi); + PubKeyNode node = new PubKeyNode(); + if (candidateLen > 0) { + node.pubkey = new String(ByteUtil.readBytes(bi, candidateLen)); + } + msg.candidate = node; + msg.lastLogIndex = ByteUtil.readLong(bi); + msg.lastLogTerm = ByteUtil.readLong(bi); + return msg; + } + + @Override + public RaftMessageType getType() { + return type; + } + + public long getTerm() { + return term; + } + + public void setTerm(long term) { + this.term = term; + } + + public PubKeyNode getCandidate() { + return candidate; + } + + public void setCandidate(PubKeyNode candidate) { + this.candidate = candidate; + } + + public long getLastLogIndex() { + return lastLogIndex; + } + + public void setLastLogIndex(long lastLogIndex) { + this.lastLogIndex = lastLogIndex; + } + + public long getLastLogTerm() { + return lastLogTerm; + } + + public void setLastLogTerm(long lastLogTerm) { + this.lastLogTerm = lastLogTerm; + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/message/RequestVoteResponse.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/RequestVoteResponse.java new file mode 100644 index 0000000..baccd39 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/message/RequestVoteResponse.java @@ -0,0 +1,64 @@ +package org.bdware.consistency.plugin.raft.algo.message; + +import org.bdware.sc.conn.ByteUtil; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +public class RequestVoteResponse implements IRaftMessage { + RaftMessageType type; + long term; + boolean granted; + + public RequestVoteResponse() { + type = RaftMessageType.RequestVoteResponse; + } + + public RequestVoteResponse(long term, boolean granted) { + type = RaftMessageType.RequestVoteResponse; + this.term = term; + this.granted = granted; + } + + @Override + public byte[] getBytes() { + ByteArrayOutputStream msg = new ByteArrayOutputStream(); + ByteUtil.writeLong(msg, term); + ByteUtil.writeBoolean(msg, granted); + return msg.toByteArray(); + } + + @Override + public IRaftMessage parse(byte[] b) { + RequestVoteResponse msg = new RequestVoteResponse(); + ByteArrayInputStream bi = new ByteArrayInputStream(b); + msg.term = ByteUtil.readLong(bi); + msg.granted = ByteUtil.readBoolean(bi); + return msg; + } + + @Override + public RaftMessageType getType() { + return type; + } + + public void setType(RaftMessageType type) { + this.type = type; + } + + public long getTerm() { + return term; + } + + public void setTerm(long term) { + this.term = term; + } + + public boolean isGranted() { + return granted; + } + + public void setGranted(boolean granted) { + this.granted = granted; + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/LogEntry.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/LogEntry.java new file mode 100644 index 0000000..b89cd14 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/LogEntry.java @@ -0,0 +1,76 @@ +package org.bdware.consistency.plugin.raft.algo.storage; + + +import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.conn.ByteUtil; +import org.bdware.sc.util.JsonUtil; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class LogEntry { + + private long term; + private long index; + private ContractRequest command; + + + public long getTerm() { + return term; + } + + public void setTerm(long term) { + this.term = term; + } + + public long getIndex() { + return index; + } + + public void setIndex(long index) { + this.index = index; + } + + public ContractRequest getCommand() { + return command; + } + + public void setCommand(ContractRequest command) { + this.command = command; + } + + public byte[] getBytes() { + ByteArrayOutputStream bo = new ByteArrayOutputStream(); + ByteUtil.writeLong(bo, term); + ByteUtil.writeLong(bo, index); + byte[] dataByte = JsonUtil.toJson(command).getBytes(StandardCharsets.UTF_8); + try { + if (dataByte != null && dataByte.length != 0) { + ByteUtil.writeInt(bo, dataByte.length); + bo.write(dataByte); + } else { + ByteUtil.writeInt(bo, 0); + } + } catch (IOException e) { + e.printStackTrace(); + } + return bo.toByteArray(); + } + + public static LogEntry parse(byte[] bytes) { + LogEntry entry = new LogEntry(); + ByteArrayInputStream bi = new ByteArrayInputStream(bytes); + entry.term = ByteUtil.readLong(bi); + entry.index = ByteUtil.readLong(bi); + int dataLen = ByteUtil.readInt(bi); + if (dataLen > 0) { + byte[] dataByte = ByteUtil.readBytes(bi, dataLen); + entry.command = JsonUtil.fromJson(new String(dataByte, StandardCharsets.UTF_8), ContractRequest.class); + } else { + entry.command = null; + } + return entry; + } +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/LogManager.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/LogManager.java new file mode 100644 index 0000000..9f4bda4 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/LogManager.java @@ -0,0 +1,188 @@ +package org.bdware.consistency.plugin.raft.algo.storage; + +import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.Logger; +import org.bdware.consistency.plugin.raft.algo.util.FileUtil; +import org.bdware.sc.bean.ContractRequest; + +import java.io.File; +import java.io.IOException; +import java.util.*; + + +public class LogManager { + private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(LogManager.class); + + private final String logDir; + private final TreeMap segmentStartIndexMap = new TreeMap<>(); // key is start index, value is file name of the segment + private int maxSegmentFileSize; + private Segment lastSegment; + + + public LogManager(String raftDataDir, int maxSegmentFileSize) { + this.logDir = raftDataDir + File.separator + "log"; + this.maxSegmentFileSize = maxSegmentFileSize; + File dir = new File(logDir); + if (!dir.exists()) { + dir.mkdirs(); + } + fillMap(dir); + if (!segmentStartIndexMap.isEmpty()) { + lastSegment = Segment.loadSegment(logDir, segmentStartIndexMap.lastEntry().getValue()); + LOGGER.info("SegmentStart not Null!" + this.getLastEntryIndex()); + } else { + LogEntry first = new LogEntry(); + first.setIndex(0); + first.setTerm(0); + first.setCommand(new ContractRequest()); + append(first); + LOGGER.info("SegmentStart is Null!" + this.getLastEntryIndex()); + } + } + + public LogEntry getLastEntry() { + if (lastSegment == null) { + return null; + } + return lastSegment.getLastEntry(); + } + + public long getLastEntryIndex() { + LogEntry entry = getLastEntry(); + if (entry == null) { + return -1; + } + return entry.getIndex(); + } + + public LogEntry getEntry(long index) { + if (lastSegment == null || index < 0 || index > lastSegment.getEndIndex()) { + return null; + } + if (index >= lastSegment.getStartIndex() && index <= lastSegment.getEndIndex()) { + return lastSegment.getEntry(index); + } + Segment segment = Segment.loadSegment(logDir, segmentStartIndexMap.floorEntry(index).getValue()); + return segment.getEntry(index); + } + + public long append(LogEntry entry) { + long newLastLogIndex = this.getLastEntryIndex(); + newLastLogIndex++; + byte[] entryBytes = entry.getBytes(); + LOGGER.info("AppendLogEntry: last=" + this.getLastEntryIndex() + " -> toAdd=" + entry.getIndex()); + + try { + if (segmentStartIndexMap.isEmpty() || lastSegment == null || + lastSegment.getFileSize() + entryBytes.length > maxSegmentFileSize) { + // close file and rename + if (lastSegment != null) { + FileUtil.closeFile(lastSegment.getRandomAccessFile()); + String newFileName = String.format("%016x-%016x", + lastSegment.getStartIndex(), lastSegment.getEndIndex()); + String newFullFileName = logDir + File.separator + newFileName; + File newFile = new File(newFullFileName); + String oldFullFileName = logDir + File.separator + lastSegment.getFileName(); + File oldFile = new File(oldFullFileName); + FileUtils.moveFile(oldFile, newFile); + segmentStartIndexMap.put(lastSegment.getStartIndex(), newFileName); + } + String segmentFileName = "open-" + String.format("%016x", newLastLogIndex); + lastSegment = new Segment(logDir, segmentFileName); + segmentStartIndexMap.put(newLastLogIndex, segmentFileName); + } + if (entry.getIndex() == 0) { + entry.setIndex(newLastLogIndex); + } + if (entry.getIndex() != newLastLogIndex) { + LOGGER.warn("received invalid log index: {}", entry.getIndex()); + throw new RuntimeException("received invalid log index: {}" + entry.getIndex()); + } + lastSegment.append(entry); + } catch (IOException e) { + e.printStackTrace(); + } + return newLastLogIndex; + } + + public long append(List entries) { + long newLastLogIndex = this.getLastEntryIndex(); + for (LogEntry entry : entries) { + newLastLogIndex = append(entry); + } + return newLastLogIndex; + } + + public List getEntriesStartAt(long logIndex) { + Map.Entry floorEntry = segmentStartIndexMap.floorEntry(logIndex); + if (floorEntry == null) { + LOGGER.warn("received invalid log index: {}", logIndex); + throw new RuntimeException("received invalid log index: {}" + logIndex); + } + SortedMap tailMap = segmentStartIndexMap.tailMap(floorEntry.getKey()); + List entries = new ArrayList<>(); + tailMap.forEach((key, value) -> { + Segment segment = Segment.loadSegment(logDir, value); + entries.addAll(segment.getEntriesNotLessThan(logIndex)); + segment.close(); + }); + return entries; + } + + public void deleteEntriesStartAt(long logIndex) { + LOGGER.info("deleteEntry:" + logIndex); + + Map.Entry floorEntry = segmentStartIndexMap.floorEntry(logIndex - 1); + long floorKey; + if (floorEntry == null) { + floorKey = -1; + lastSegment = null; + } else { + floorKey = floorEntry.getKey(); + if (lastSegment.getStartIndex() != floorKey) { + lastSegment.close(); + lastSegment = Segment.loadSegment(logDir, floorEntry.getValue()); + lastSegment.setMode("rw"); + } + lastSegment.deleteEntriesStartAt(logIndex); + segmentStartIndexMap.put(lastSegment.getStartIndex(), lastSegment.getFileName()); + } + Map.Entry lastMapEntry = segmentStartIndexMap.lastEntry(); + while (lastMapEntry != null && lastMapEntry.getKey() > floorKey) { + FileUtil.deleteFile(logDir, lastMapEntry.getValue()); + segmentStartIndexMap.remove(lastMapEntry.getKey()); + lastMapEntry = segmentStartIndexMap.lastEntry(); + } + } + + public void setMaxSegmentFileSize(int maxSegmentFileSize) { + this.maxSegmentFileSize = maxSegmentFileSize; + } + + private void fillMap(File dir) { + if (dir == null) return; + File[] files = dir.listFiles(); + if (files == null) { + LOGGER.warn("get file in dir({}) error", dir.getName()); + return; + } + for (File file : files) { + if (file.isFile()) { + long startIndexInFile; + String fileName = file.getName(); + String[] splitArray = fileName.split("-"); + if (splitArray.length != 2) { + LOGGER.warn("segment filename[{}] is not valid", fileName); + throw new RuntimeException("segment filename[{}] is not valid" + fileName); + } + if (splitArray[0].equals("open")) { + startIndexInFile = Long.parseLong(splitArray[1], 16); + } else { + startIndexInFile = Long.parseLong(splitArray[0], 16); + } + segmentStartIndexMap.put(startIndexInFile, fileName); + } + } + } + +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/Segment.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/Segment.java new file mode 100644 index 0000000..9a29b4b --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/Segment.java @@ -0,0 +1,185 @@ +package org.bdware.consistency.plugin.raft.algo.storage; + +import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.Logger; +import org.bdware.consistency.plugin.raft.algo.util.FileUtil; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.List; + +public class Segment { + private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(Segment.class); + private long startIndex; + private long endIndex; + private long fileSize; + private String dir; + private String fileName; + private RandomAccessFile randomAccessFile; + private List records; + + public void close() { + FileUtil.closeFile(randomAccessFile); + } + + public void setMode(String mode) { + FileUtil.closeFile(randomAccessFile); + randomAccessFile = FileUtil.openFile(dir, fileName, mode); + } + + public static class Record { + public long offset; + public LogEntry entry; + + public Record(long offset, LogEntry entry) { + this.offset = offset; + this.entry = entry; + } + } + + // Create new segment file + public Segment(String dir, String name) { + this.dir = dir; + fileName = name; + records = new ArrayList<>(); + } + + public Segment(String dir) { + this.dir = dir; + records = new ArrayList<>(); + } + + public LogEntry getEntry(long index) { + if (records.size() == 0 || index < startIndex || index > endIndex) { + return null; + } + int indexInList = (int) (index - startIndex); + return records.get(indexInList).entry; + } + + public List getEntriesNotLessThan(long index) { + List entries = new ArrayList<>(); + if (records.size() == 0 || index > endIndex) { + return entries; + } + int indexInList = Math.max(0, (int) (index - startIndex)); + for (int i = indexInList; i < records.size(); i++) { + entries.add(records.get(i).entry); + } + return entries; + } + + public void deleteEntriesStartAt(long index) { + if (records.size() == 0 || index > endIndex) { + return; + } + int indexInList = (int) (index - startIndex); + if (indexInList <= 0) { + records.clear(); + FileUtil.closeFile(randomAccessFile); + FileUtil.deleteFile(dir, fileName); + fileSize = 0; + return; + } + endIndex = index - 1; + fileSize = records.get(indexInList).offset; + records.removeAll(records.subList(indexInList, records.size())); + FileUtil.truncate(randomAccessFile, fileSize); + FileUtil.closeFile(randomAccessFile); + String oldFullFileName = dir + File.separator + fileName; + fileName = String.format("open-%016x", startIndex); + String newFullFileName = dir + File.separator + fileName; + FileUtil.moveFile(oldFullFileName, newFullFileName); + randomAccessFile = FileUtil.openFile(dir, fileName, "rw"); + } + + public List getEntries() { + return getEntriesNotLessThan(startIndex); + } + + public LogEntry getLastEntry() { + return getEntry(endIndex); + } + + public void append(LogEntry entry) { + if (randomAccessFile == null) { + randomAccessFile = FileUtil.createFile(dir, fileName, "rw"); + } + if (records.isEmpty()) { + startIndex = entry.getIndex(); + } + try { + records.add(new Record(randomAccessFile.getFilePointer(), entry)); + FileUtil.writeLogEntryToFile(randomAccessFile, entry); + endIndex = entry.getIndex(); + fileSize = randomAccessFile.length(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static Segment loadSegment(String dir, String fileName) { + Segment segment = new Segment(dir); + try { + String[] splitArray = fileName.split("-"); + if (splitArray.length != 2) { + LOGGER.warn("segment filename[{}] is not valid", fileName); + throw new RuntimeException("segment filename[{}] is not valid" + fileName); + } + segment.fileName = fileName; + if (splitArray[0].equals("open")) { + segment.randomAccessFile = FileUtil.openFile(dir, fileName, "rw"); + } else { + segment.randomAccessFile = FileUtil.openFile(dir, fileName, "r"); + } + segment.fileSize = segment.randomAccessFile.length(); // ioException + long offset = 0; + while (offset < segment.fileSize) { + LogEntry entry = FileUtil.readLogEntryFromFile(segment.randomAccessFile); + if (entry == null) { + throw new RuntimeException("read segment log failed"); + } + Record record = new Record(offset, entry); + segment.records.add(record); + offset = segment.randomAccessFile.getFilePointer(); // ioException + } + int entrySize = segment.records.size(); + if (entrySize > 0) { + segment.startIndex = segment.getRecords().get(0).entry.getIndex(); + segment.endIndex = segment.getRecords().get(entrySize - 1).entry.getIndex(); + } + return segment; + } catch (IOException ioException) { + LOGGER.warn("loadSegment exception:", ioException); + throw new RuntimeException("loadSegment error" + ioException); + } + } + + public long getStartIndex() { + return startIndex; + } + + public long getEndIndex() { + return endIndex; + } + + public long getFileSize() { + return fileSize; + } + + public String getFileName() { + return fileName; + } + + public RandomAccessFile getRandomAccessFile() { + return randomAccessFile; + } + + public List getRecords() { + return records; + } + +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/StateManager.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/StateManager.java new file mode 100644 index 0000000..fd93f63 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/storage/StateManager.java @@ -0,0 +1,26 @@ +package org.bdware.consistency.plugin.raft.algo.storage; + +import org.bdware.sc.units.PubKeyNode; + +// todo +public class StateManager { + String filename; + static long currentTerm; + static PubKeyNode node; + public static long loadCurrentTerm() { + return currentTerm; + } + + public static PubKeyNode loadVotedFor() { + return node; + } + + public static void saveCurrentTerm(long currentTerm) { + StateManager.currentTerm = currentTerm; + } + + public static void saveVotedFor(PubKeyNode node) { + StateManager.node = node; + } + +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/util/FileUtil.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/util/FileUtil.java new file mode 100644 index 0000000..5ceb835 --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/util/FileUtil.java @@ -0,0 +1,143 @@ +package org.bdware.consistency.plugin.raft.algo.util; + +import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.Logger; +import org.bdware.consistency.plugin.raft.algo.message.IRaftMessage; +import org.bdware.consistency.plugin.raft.algo.storage.LogEntry; +import org.bdware.consistency.plugin.raft.algo.storage.Segment; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.lang.reflect.Method; +import java.nio.channels.FileChannel; +import java.util.zip.CRC32; + +public class FileUtil { + private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger(FileUtil.class); + + public static RandomAccessFile createFile(String dir, String fileName, String mode) { + try { + String fullFileName = dir + File.separator + fileName; + File file = new File(fullFileName); + file.createNewFile(); + return new RandomAccessFile(file, mode); + } catch (FileNotFoundException e) { + LOGGER.warn("file not fount, file={}", fileName); + throw new RuntimeException("file not found, file=" + fileName); + } catch (IOException e) { + LOGGER.warn("create file error, msg={}", e.getMessage()); + throw new RuntimeException("create file error, msg={}" + e.getMessage()); + } + } + + public static RandomAccessFile openFile(String dir, String fileName, String mode) { + try { + String fullFileName = dir + File.separator + fileName; + File file = new File(fullFileName); + return new RandomAccessFile(file, mode); + } catch (FileNotFoundException e) { + LOGGER.warn("file not fount, file={}", fileName); + throw new RuntimeException("file not found, file=" + fileName); + } + } + + + public static void closeFile(RandomAccessFile randomAccessFile) { + try { + if (randomAccessFile != null) { + randomAccessFile.close(); + } + } catch (IOException e) { + LOGGER.warn("close file error, msg={}", e.getMessage()); + throw new RuntimeException("close file error, msg={}" + e.getMessage()); + } + } + + public static void deleteFile(String dir, String fileName) { + String fullFileName = dir + File.separator + fileName; + File file = new File(fullFileName); + try { + FileUtils.forceDelete(file); + } catch (IOException e) { + LOGGER.warn("delete file error, msg={}", e.getMessage()); + throw new RuntimeException("delete file error, msg={}" + e.getMessage()); + } + } + + public static void moveFile(String oldFullFileName, String newFullFileName) { + File oldFile = new File(oldFullFileName); + File newFile = new File(newFullFileName); + try { + FileUtils.moveFile(oldFile, newFile); + } catch (IOException e) { + LOGGER.warn("move file error, msg={}", e.getMessage()); + throw new RuntimeException("move file error, msg={}" + e.getMessage()); + } + } + + public static void truncate(RandomAccessFile randomAccessFile, long size) { + FileChannel fileChannel = randomAccessFile.getChannel(); + try { + fileChannel.truncate(size); + } catch (IOException e) { + LOGGER.warn("truncate file error, msg={}", e.getMessage()); + throw new RuntimeException("truncate file error, msg={}" + e.getMessage()); + } finally { + try { + fileChannel.close(); + } catch (IOException e) { + LOGGER.warn("close fileChannel error, msg={}", e.getMessage()); + throw new RuntimeException("close fileChannel error, msg={}" + e.getMessage()); + } + } + } + + public static LogEntry readLogEntryFromFile(RandomAccessFile raf) { + try { + long crc32FromFile = raf.readLong(); + int dataLen = raf.readInt(); + int hasReadLen = (Long.SIZE + Integer.SIZE) / Byte.SIZE; + if (raf.length() - hasReadLen < dataLen) { + LOGGER.warn("file remainLength < dataLen"); + return null; + } + byte[] data = new byte[dataLen]; + int readLen = raf.read(data); + if (readLen != dataLen) { + LOGGER.warn("readLen != dataLen"); + return null; + } + long crc32FromData = getCRC32(data); + if (crc32FromFile != crc32FromData) { + LOGGER.warn("crc32 check failed"); + return null; + } + return LogEntry.parse(data); + } catch (Exception ex) { + LOGGER.warn("readLogEntryFromFile meet exception, {}", ex.getMessage()); + return null; + } + } + + public static void writeLogEntryToFile(RandomAccessFile raf, LogEntry entry) { + byte[] bytes = entry.getBytes(); + long crc32 = getCRC32(bytes); + try { + raf.writeLong(crc32); + raf.writeInt(bytes.length); + raf.write(bytes); + } catch (IOException ex) { + LOGGER.warn("write LogEntry to file error, msg={}", ex.getMessage()); + throw new RuntimeException("write LogEntry to file error"); + } + } + + public static long getCRC32(byte[] data) { + CRC32 crc32 = new CRC32(); + crc32.update(data); + return crc32.getValue(); + } + +} diff --git a/src/main/java/org/bdware/consistency/plugin/raft/algo/util/HeartBeatUtil.java b/src/main/java/org/bdware/consistency/plugin/raft/algo/util/HeartBeatUtil.java new file mode 100644 index 0000000..d96e2ff --- /dev/null +++ b/src/main/java/org/bdware/consistency/plugin/raft/algo/util/HeartBeatUtil.java @@ -0,0 +1,51 @@ +package org.bdware.consistency.plugin.raft.algo.util; + +import org.bdware.sc.ContractManager; + +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class HeartBeatUtil { + private static HeartBeatUtil instance; + Map recordedFuture; + + private HeartBeatUtil() { + recordedFuture = new ConcurrentHashMap<>(); + } + + public static HeartBeatUtil getInstance() { + if (null == instance) { + instance = new HeartBeatUtil(); + } + return instance; + } + + public synchronized void cancel(TimerTask timerTask) { + try { + if (recordedFuture.containsKey(timerTask)) { + ScheduledFuture future = recordedFuture.get(timerTask); + future.cancel(false); + recordedFuture.remove(timerTask); + timerTask.cancel(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public synchronized void schedule(TimerTask timerTask, int delay, int period) { + try { + if (!recordedFuture.containsKey(timerTask)) { + ScheduledFuture future = + ContractManager.scheduledThreadPool.scheduleWithFixedDelay( + timerTask, delay, period, TimeUnit.MILLISECONDS); + recordedFuture.put(timerTask, future); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/test/java/org/bdware/consistency/plugin/raft/algo/message/MessageTest.java b/src/test/java/org/bdware/consistency/plugin/raft/algo/message/MessageTest.java new file mode 100644 index 0000000..0dae1ba --- /dev/null +++ b/src/test/java/org/bdware/consistency/plugin/raft/algo/message/MessageTest.java @@ -0,0 +1,91 @@ +package org.bdware.consistency.plugin.raft.algo.message; + +import org.bdware.consistency.plugin.raft.algo.storage.LogEntry; +import org.bdware.sc.bean.ContractRequest; +import org.bdware.sc.units.PubKeyNode; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class MessageTest { + @Test + public void testRequestVote() { + RequestVote msg = new RequestVote(); + msg.setTerm(98567); + PubKeyNode node = new PubKeyNode(); + node.pubkey = "kjfhkefbviury"; + msg.setCandidate(node); + msg.setLastLogTerm(8956737); + msg.setLastLogIndex(42956); + + byte[] bytes = RaftMessageParser.setMessage(msg).getBytes(); + RequestVote message = (RequestVote) RaftMessageParser.parse(bytes); + + assert msg.getTerm() == message.getTerm(); + assert msg.getCandidate().pubkey.equals(message.getCandidate().pubkey); + assert msg.getLastLogTerm() == message.getLastLogTerm(); + assert msg.getLastLogIndex() == message.getLastLogIndex(); + } + + @Test + public void testRequestVoteResponse() { + RequestVoteResponse msg = new RequestVoteResponse(); + msg.setTerm(4563657); + msg.setGranted(true); + + byte[] bytes = RaftMessageParser.setMessage(msg).getBytes(); + RequestVoteResponse message = (RequestVoteResponse) RaftMessageParser.parse(bytes); + + assert msg.getTerm() == message.getTerm(); + assert msg.isGranted() == message.isGranted(); + } + + @Test + public void testAppendEntries() { + AppendEntries msg = new AppendEntries(); + msg.setTerm(353); + PubKeyNode node = new PubKeyNode(); + node.pubkey = "ksuhgkjfjksdbfchj"; + msg.setLeader(node); + msg.setPrevLogIndex(298476957); + msg.setPrevLogTerm(5467); + msg.setLeaderCommit(586793); + List entries = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + LogEntry entry = new LogEntry(); + entry.setTerm(i); + entry.setIndex(i * 100); + ContractRequest command = new ContractRequest(); + command.setAction(String.format("%20d-%20d", i, i * 100)); + entry.setCommand(command); + entries.add(entry); + } + msg.setEntries(entries); + + byte[] bytes = RaftMessageParser.setMessage(msg).getBytes(); + AppendEntries message = (AppendEntries) RaftMessageParser.parse(bytes); + + assert msg.getTerm() == message.getTerm(); + assert msg.getLeader().pubkey.equals(message.getLeader().pubkey); + assert msg.getPrevLogIndex() == message.getPrevLogIndex(); + assert msg.getPrevLogTerm() == message.getPrevLogTerm(); + assert msg.getLeaderCommit() == message.getLeaderCommit(); + assert msg.getEntries().size() == message.getEntries().size(); + assert msg.getEntries().get(0).getCommand().getAction().equals(message.getEntries().get(0).getCommand().getAction()); + assert msg.getEntries().get(9).getCommand().getAction().equals(message.getEntries().get(9).getCommand().getAction()); + } + + @Test + public void testAppendEntriesResponse() { + AppendEntriesResponse msg = new AppendEntriesResponse(); + msg.setTerm(5349679); + msg.setSuccess(true); + + byte[] bytes = RaftMessageParser.setMessage(msg).getBytes(); + AppendEntriesResponse message = (AppendEntriesResponse) RaftMessageParser.parse(bytes); + + assert msg.getTerm() == message.getTerm(); + assert msg.isSuccess() == message.isSuccess(); + } +} diff --git a/src/test/java/org/bdware/consistency/plugin/raft/algo/storage/LogManagerTest.java b/src/test/java/org/bdware/consistency/plugin/raft/algo/storage/LogManagerTest.java new file mode 100644 index 0000000..9930be4 --- /dev/null +++ b/src/test/java/org/bdware/consistency/plugin/raft/algo/storage/LogManagerTest.java @@ -0,0 +1,66 @@ +package org.bdware.consistency.plugin.raft.algo.storage; + +import org.bdware.sc.bean.ContractRequest; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +public class LogManagerTest { + @Test + public void testLogManager() { + LogManager logManager = new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000); + List entries = new ArrayList<>(); + LogEntry entry; + for (int i = 0; i < 1000; i++) { + entry = new LogEntry(); + entry.setTerm(i / 10); + entry.setIndex(i); + ContractRequest command = new ContractRequest(); + command.setAction(String.format("%20d-%20d", i, i * 100)); + entry.setCommand(command); + entries.add(entry); + } + long idx = logManager.append(entries); + assert idx == 999; + } + + @Test + public void testLogManager2() { + LogManager logManager = new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000); + LogEntry entry = logManager.getEntry(386); + assert entry.getTerm() == 38; + assert entry.getIndex() == 386; + assert entry.getCommand().getAction().equals(String.format("%20d-%20d", 386, 386 * 100)); + entry = logManager.getLastEntry(); + assert entry.getTerm() == 99; + assert entry.getIndex() == 999; + assert entry.getCommand().getAction().equals(String.format("%20d-%20d", 999, 999 * 100)); + entry = logManager.getEntry(0); + assert entry.getTerm() == 0; + assert entry.getIndex() == 0; + assert entry.getCommand().getAction().equals(String.format("%20d-%20d", 0, 0)); + List entries = new ArrayList<>(); + for (int i = 1000; i < 1100; i++) { + entry = new LogEntry(); + entry.setTerm(i / 10); + entry.setIndex(i); + ContractRequest command = new ContractRequest(); + command.setAction(String.format("%20d-%20d", i, i * 100)); + entry.setCommand(command); + entries.add(entry); + } + long index = logManager.append(entries); + assert index == 1099; + } + @Test + public void testLogManager3() { + LogManager logManager = new LogManager(System.getProperty("user.dir") + File.separator + "data", 100000); + List entryList = logManager.getEntriesStartAt(677); + assert entryList.size() == 1100 - 677; + assert entryList.get(100).getIndex() == 777; + logManager.deleteEntriesStartAt(983); + assert logManager.getLastEntryIndex() == 982; + } +}